Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,12 @@ const EnvironmentSchema = z
RUN_ENGINE_CONCURRENCY_SWEEPER_SCAN_JITTER_IN_MS: z.coerce.number().int().optional(),
RUN_ENGINE_CONCURRENCY_SWEEPER_PROCESS_MARKED_JITTER_IN_MS: z.coerce.number().int().optional(),

// TTL System settings for automatic run expiration
RUN_ENGINE_TTL_SYSTEM_DISABLED: BoolEnv.default(false),
RUN_ENGINE_TTL_SYSTEM_SHARD_COUNT: z.coerce.number().int().optional(),
RUN_ENGINE_TTL_SYSTEM_POLL_INTERVAL_MS: z.coerce.number().int().default(1_000),
RUN_ENGINE_TTL_SYSTEM_BATCH_SIZE: z.coerce.number().int().default(100),

RUN_ENGINE_RUN_LOCK_DURATION: z.coerce.number().int().default(5000),
RUN_ENGINE_RUN_LOCK_AUTOMATIC_EXTENSION_THRESHOLD: z.coerce.number().int().default(1000),
RUN_ENGINE_RUN_LOCK_MAX_RETRIES: z.coerce.number().int().default(10),
Expand Down
6 changes: 6 additions & 0 deletions apps/webapp/app/v3/runEngine.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ function createRunEngine() {
scanJitterInMs: env.RUN_ENGINE_CONCURRENCY_SWEEPER_SCAN_JITTER_IN_MS,
processMarkedJitterInMs: env.RUN_ENGINE_CONCURRENCY_SWEEPER_PROCESS_MARKED_JITTER_IN_MS,
},
ttlSystem: {
disabled: env.RUN_ENGINE_TTL_SYSTEM_DISABLED,
shardCount: env.RUN_ENGINE_TTL_SYSTEM_SHARD_COUNT,
pollIntervalMs: env.RUN_ENGINE_TTL_SYSTEM_POLL_INTERVAL_MS,
batchSize: env.RUN_ENGINE_TTL_SYSTEM_BATCH_SIZE,
},
},
runLock: {
redis: {
Expand Down
74 changes: 74 additions & 0 deletions internal-packages/run-engine/src/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,51 @@ export class RunEngine {
processWorkerQueueDebounceMs: options.queue?.processWorkerQueueDebounceMs,
dequeueBlockingTimeoutSeconds: options.queue?.dequeueBlockingTimeoutSeconds,
meter: options.meter,
ttlSystem: options.queue?.ttlSystem?.disabled
? undefined
: {
shardCount: options.queue?.ttlSystem?.shardCount,
pollIntervalMs: options.queue?.ttlSystem?.pollIntervalMs,
batchSize: options.queue?.ttlSystem?.batchSize,
callback: this.#ttlExpiredCallback.bind(this),
},
// Run data provider for V3 optimized format - reads from PostgreSQL when no Redis message key exists
runDataProvider: {
getRunData: async (runId: string) => {
const run = await this.prisma.taskRun.findUnique({
where: { id: runId },
select: {
queue: true,
organizationId: true,
projectId: true,
runtimeEnvironmentId: true,
environmentType: true,
concurrencyKey: true,
attemptNumber: true,
queueTimestamp: true,
workerQueue: true,
taskIdentifier: true,
},
});

if (!run || !run.organizationId || !run.environmentType) {
return undefined;
}

return {
queue: run.queue,
orgId: run.organizationId,
projectId: run.projectId,
environmentId: run.runtimeEnvironmentId,
environmentType: run.environmentType,
concurrencyKey: run.concurrencyKey ?? undefined,
attempt: run.attemptNumber ?? 0,
timestamp: run.queueTimestamp?.getTime() ?? Date.now(),
workerQueue: run.workerQueue,
taskIdentifier: run.taskIdentifier,
};
},
},
});

this.worker = new Worker({
Expand Down Expand Up @@ -2066,6 +2111,35 @@ export class RunEngine {
}));
}

/**
* Callback for the TTL system when runs expire.
* Calls ttlSystem.expireRun() for each expired run to update database and emit events.
*/
async #ttlExpiredCallback(
runs: Array<{ queueKey: string; runId: string; orgId: string }>
): Promise<void> {
// Process expired runs concurrently with limited parallelism
await pMap(
runs,
async (run) => {
try {
await this.ttlSystem.expireRun({ runId: run.runId });
this.logger.debug("TTL system expired run", {
runId: run.runId,
orgId: run.orgId,
});
} catch (error) {
this.logger.error("Failed to expire run via TTL system", {
runId: run.runId,
orgId: run.orgId,
error,
});
}
},
{ concurrency: 10 }
);
}

/**
* Invalidates the billing cache for an organization when their plan changes
* Runs in background and handles all errors internally
Expand Down
11 changes: 11 additions & 0 deletions internal-packages/run-engine/src/engine/systems/enqueueSystem.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
TaskRun,
TaskRunExecutionStatus,
} from "@trigger.dev/database";
import { parseNaturalLanguageDuration } from "@trigger.dev/core/v3/isomorphic";
import { MinimalAuthenticatedEnvironment } from "../../shared/index.js";
import { ExecutionSnapshotSystem } from "./executionSnapshotSystem.js";
import { SystemResources } from "./systems.js";
Expand Down Expand Up @@ -81,6 +82,15 @@ export class EnqueueSystem {

const timestamp = (run.queueTimestamp ?? run.createdAt).getTime() - run.priorityMs;

// Calculate TTL expiration timestamp if the run has a TTL
let ttlExpiresAt: number | undefined;
if (run.ttl) {
const expireAt = parseNaturalLanguageDuration(run.ttl);
if (expireAt) {
ttlExpiresAt = expireAt.getTime();
}
}

await this.$.runQueue.enqueueMessage({
env,
workerQueue,
Expand All @@ -95,6 +105,7 @@ export class EnqueueSystem {
concurrencyKey: run.concurrencyKey ?? undefined,
timestamp,
attempt: 0,
ttlExpiresAt,
},
});

Expand Down
11 changes: 11 additions & 0 deletions internal-packages/run-engine/src/engine/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,17 @@ export type RunEngineOptions = {
scanJitterInMs?: number;
processMarkedJitterInMs?: number;
};
/** TTL system options for automatic run expiration */
ttlSystem?: {
/** Number of shards for TTL sorted sets (default: same as queue shards) */
shardCount?: number;
/** How often to poll each shard for expired runs (ms, default: 1000) */
pollIntervalMs?: number;
/** Max number of runs to expire per poll per shard (default: 100) */
batchSize?: number;
/** Whether TTL consumers are disabled (default: false) */
disabled?: boolean;
};
};
runLock: {
redis: RedisOptions;
Expand Down
Loading
Loading