From 74179aea1b8b6b5c6584b2b83d236dfec6ef1504 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 1 Feb 2026 10:55:10 +0000 Subject: [PATCH 1/6] feat(run-engine): add optimized message format to reduce Redis storage by ~80% This implements a new V3 message format that eliminates separate message keys for pending runs, encoding all necessary data directly in the sorted set member. Key changes: - Add messageEncoding module with encode/decode helpers for V3 format - Add useOptimizedMessageFormat option to RunQueue for gradual rollout - Update Lua scripts to handle both legacy and V3 formats (dual-read) - Add V3-specific enqueue/nack Lua scripts (V3 write when enabled) - Add comprehensive tests for encoding/decoding roundtrips Migration strategy: 1. Deploy with useOptimizedMessageFormat=false (new code reads both formats) 2. Enable useOptimizedMessageFormat=true (new messages use V3) 3. Old messages drain naturally as they're processed (no backfill needed) Storage reduction: ~88% per pending message (442 bytes -> 53 bytes) https://claude.ai/code/session_01AyzQp6tbj7th5QRTCYjJR5 --- .../run-engine/src/run-queue/index.ts | 514 ++++++++++++++---- .../src/run-queue/messageEncoding.ts | 281 ++++++++++ .../run-queue/tests/messageEncoding.test.ts | 256 +++++++++ 3 files changed, 953 insertions(+), 98 deletions(-) create mode 100644 internal-packages/run-engine/src/run-queue/messageEncoding.ts create mode 100644 internal-packages/run-engine/src/run-queue/tests/messageEncoding.test.ts diff --git a/internal-packages/run-engine/src/run-queue/index.ts b/internal-packages/run-engine/src/run-queue/index.ts index 5127ec3c75..a928503275 100644 --- a/internal-packages/run-engine/src/run-queue/index.ts +++ b/internal-packages/run-engine/src/run-queue/index.ts @@ -43,6 +43,14 @@ import { RunQueueKeyProducerEnvironment, RunQueueSelectionStrategy, } from "./types.js"; +import { + encodeQueueMember, + encodeWorkerQueueEntry, + decodeWorkerQueueEntry, + isEncodedWorkerQueueEntry, + reconstructMessageFromWorkerEntry, + LUA_ENCODING_HELPERS, +} from "./messageEncoding.js"; import { WorkerQueueResolver } from "./workerQueueResolver.js"; const SemanticAttributes = { @@ -92,6 +100,16 @@ export type RunQueueOptions = { processMarkedJitterInMs?: number; callback: ConcurrencySweeperCallback; }; + /** + * When enabled, uses an optimized message format that eliminates separate message keys. + * This reduces Redis storage by ~80% for pending messages. + * + * Migration strategy: + * 1. Deploy with this disabled (default) - new code can read both formats + * 2. Enable this flag - new messages use optimized format + * 3. Old messages drain naturally as they're processed + */ + useOptimizedMessageFormat?: boolean; }; export interface ConcurrencySweeperCallback { @@ -175,8 +193,10 @@ export class RunQueue { private _observableWorkerQueues: Set = new Set(); private _meter: Meter; private _queueCooloffStates: Map = new Map(); + private _useOptimizedMessageFormat: boolean; constructor(public readonly options: RunQueueOptions) { + this._useOptimizedMessageFormat = options.useOptimizedMessageFormat ?? false; this.shardCount = options.shardCount ?? 2; this.retryOptions = options.retryOptions ?? defaultRetrySettings; this.redis = createRedisClient(options.redis, { @@ -1434,14 +1454,31 @@ export class RunQueue { this.#getWorkerQueueFromMessage(message.message) ); - const messageKeyValue = this.keys.messageKey(message.message.orgId, message.messageId); + let workerQueueEntry: string; + + if (this._useOptimizedMessageFormat) { + // V3 format: encode all needed data in worker queue entry + // This allows full reconstruction without a message key lookup + workerQueueEntry = encodeWorkerQueueEntry({ + runId: message.messageId, + workerQueue: this.#getWorkerQueueFromMessage(message.message), + attempt: message.message.attempt, + environmentType: message.message.environmentType, + queueKey: message.message.queue, + timestamp: message.message.timestamp, + }); + } else { + // Legacy format: store message key path + workerQueueEntry = this.keys.messageKey(message.message.orgId, message.messageId); + } operations.push({ workerQueueKey: workerQueueKey, messageId: message.messageId, + format: this._useOptimizedMessageFormat ? "v3" : "legacy", }); - pipeline.rpush(workerQueueKey, messageKeyValue); + pipeline.rpush(workerQueueKey, workerQueueEntry); } span.setAttribute("operations_count", operations.length); @@ -1470,39 +1507,76 @@ export class RunQueue { const queueName = message.queue; const messageId = message.runId; - const messageData = JSON.stringify(message); const messageScore = String(message.timestamp); - this.logger.debug("Calling enqueueMessage", { - queueKey, - messageKey, - queueCurrentConcurrencyKey, - envCurrentConcurrencyKey, - queueCurrentDequeuedKey, - envCurrentDequeuedKey, - envQueueKey, - queueName, - messageId, - messageData, - messageScore, - masterQueueKey, - service: this.name, - }); + if (this._useOptimizedMessageFormat) { + // V3 optimized format: encode message data in sorted set member, no separate message key + const encodedMember = encodeQueueMember({ + runId: message.runId, + workerQueue: message.workerQueue, + attempt: message.attempt, + environmentType: message.environmentType, + }); - await this.redis.enqueueMessage( - masterQueueKey, - queueKey, - messageKey, - queueCurrentConcurrencyKey, - envCurrentConcurrencyKey, - queueCurrentDequeuedKey, - envCurrentDequeuedKey, - envQueueKey, - queueName, - messageId, - messageData, - messageScore - ); + this.logger.debug("Calling enqueueMessageV3 (optimized)", { + queueKey, + encodedMember, + queueCurrentConcurrencyKey, + envCurrentConcurrencyKey, + envQueueKey, + messageScore, + masterQueueKey, + service: this.name, + }); + + await this.redis.enqueueMessageV3( + masterQueueKey, + queueKey, + queueCurrentConcurrencyKey, + envCurrentConcurrencyKey, + queueCurrentDequeuedKey, + envCurrentDequeuedKey, + envQueueKey, + queueName, + encodedMember, + messageId, // For env queue, still use runId for consistency + messageScore + ); + } else { + // Legacy format: store full JSON in separate message key + const messageData = JSON.stringify(message); + + this.logger.debug("Calling enqueueMessage (legacy)", { + queueKey, + messageKey, + queueCurrentConcurrencyKey, + envCurrentConcurrencyKey, + queueCurrentDequeuedKey, + envCurrentDequeuedKey, + envQueueKey, + queueName, + messageId, + messageData, + messageScore, + masterQueueKey, + service: this.name, + }); + + await this.redis.enqueueMessage( + masterQueueKey, + queueKey, + messageKey, + queueCurrentConcurrencyKey, + envCurrentConcurrencyKey, + queueCurrentDequeuedKey, + envCurrentDequeuedKey, + envQueueKey, + queueName, + messageId, + messageData, + messageScore + ); + } } async #callDequeueMessagesFromQueue({ @@ -1583,25 +1657,51 @@ export class RunQueue { const messageScore = result[i + 1]; const rawMessage = result[i + 2]; - //read message - const parsedMessage = OutputPayload.safeParse(JSON.parse(rawMessage)); - if (!parsedMessage.success) { - this.logger.error(`[${this.name}] Failed to parse message`, { + let message: OutputPayload | undefined; + + // Try to detect V3 encoded format (contains \x1e delimiter and doesn't start with '{') + if (rawMessage.includes("\x1e") && !rawMessage.startsWith("{")) { + // V3 format: rawMessage is encoded member "runId|workerQueue|attempt|envType" + const decoded = decodeWorkerQueueEntry( + rawMessage + "\x1e" + messageQueue + "\x1e" + messageScore + ); + if (decoded) { + const descriptor = this.keys.descriptorFromQueue(messageQueue); + message = reconstructMessageFromWorkerEntry(decoded, descriptor); + this.logger.debug("Parsed V3 encoded message", { + messageId, + decoded, + service: this.name, + }); + } else { + this.logger.error(`[${this.name}] Failed to decode V3 message`, { + messageId, + rawMessage, + service: this.name, + }); + continue; + } + } else { + // Legacy format: rawMessage is JSON + const parsedMessage = OutputPayload.safeParse(JSON.parse(rawMessage)); + if (!parsedMessage.success) { + this.logger.error(`[${this.name}] Failed to parse legacy message`, { + messageId, + error: parsedMessage.error, + service: this.name, + }); + continue; + } + message = parsedMessage.data; + } + + if (message) { + messages.push({ messageId, - error: parsedMessage.error, - service: this.name, + messageScore, + message, }); - - continue; } - - const message = parsedMessage.data; - - messages.push({ - messageId, - messageScore, - message, - }); } this.logger.debug("dequeueMessagesFromQueue parsed result", { @@ -1873,37 +1973,76 @@ export class RunQueue { const nextRetryDelay = calculateNextRetryDelay(this.retryOptions, message.attempt); const messageScore = retryAt ?? (nextRetryDelay ? Date.now() + nextRetryDelay : Date.now()); - this.logger.debug("Calling nackMessage", { - messageKey, - messageQueue, - masterQueueKey, - queueCurrentConcurrencyKey, - envCurrentConcurrencyKey, - queueCurrentDequeuedKey, - envCurrentDequeuedKey, - envQueueKey, - messageId, - messageScore, - attempt: message.attempt, - service: this.name, - }); + if (this._useOptimizedMessageFormat) { + // V3 format: encode message data in sorted set member, no message key + const encodedMember = encodeQueueMember({ + runId: message.runId, + workerQueue: this.#getWorkerQueueFromMessage(message), + attempt: message.attempt, + environmentType: message.environmentType, + }); - await this.redis.nackMessage( - //keys - masterQueueKey, - messageKey, - messageQueue, - queueCurrentConcurrencyKey, - envCurrentConcurrencyKey, - queueCurrentDequeuedKey, - envCurrentDequeuedKey, - envQueueKey, - //args - messageId, - messageQueue, - JSON.stringify(message), - String(messageScore) - ); + this.logger.debug("Calling nackMessageV3 (optimized)", { + encodedMember, + messageQueue, + masterQueueKey, + queueCurrentConcurrencyKey, + envCurrentConcurrencyKey, + messageId, + messageScore, + attempt: message.attempt, + service: this.name, + }); + + await this.redis.nackMessageV3( + //keys + masterQueueKey, + messageQueue, + queueCurrentConcurrencyKey, + envCurrentConcurrencyKey, + queueCurrentDequeuedKey, + envCurrentDequeuedKey, + envQueueKey, + //args + encodedMember, + messageId, + messageQueue, + String(messageScore) + ); + } else { + // Legacy format + this.logger.debug("Calling nackMessage (legacy)", { + messageKey, + messageQueue, + masterQueueKey, + queueCurrentConcurrencyKey, + envCurrentConcurrencyKey, + queueCurrentDequeuedKey, + envCurrentDequeuedKey, + envQueueKey, + messageId, + messageScore, + attempt: message.attempt, + service: this.name, + }); + + await this.redis.nackMessage( + //keys + masterQueueKey, + messageKey, + messageQueue, + queueCurrentConcurrencyKey, + envCurrentConcurrencyKey, + queueCurrentDequeuedKey, + envCurrentDequeuedKey, + envQueueKey, + //args + messageId, + messageQueue, + JSON.stringify(message), + String(messageScore) + ); + } } async #callMoveToDeadLetterQueue({ message }: { message: OutputPayload }) { @@ -2212,12 +2351,43 @@ export class RunQueue { }); } - async #dequeueMessageFromKey(messageKey: string) { + async #dequeueMessageFromKey(workerQueueEntry: string) { return this.#trace("dequeueMessageFromKey", async (span) => { span.setAttributes({ - messageKey, + workerQueueEntry, }); + // Check if this is a V3 encoded worker queue entry + if (isEncodedWorkerQueueEntry(workerQueueEntry)) { + // V3 format: decode the entry directly, no Redis lookup needed + const decoded = decodeWorkerQueueEntry(workerQueueEntry); + if (!decoded) { + this.logger.error(`[${this.name}] Failed to decode V3 worker queue entry`, { + workerQueueEntry, + service: this.name, + }); + span.setAttribute("result", "DECODE_ERROR"); + return; + } + + const descriptor = this.keys.descriptorFromQueue(decoded.queueKey); + const message = reconstructMessageFromWorkerEntry(decoded, descriptor); + + // Update the currentDequeued sets (this is done in the Lua script for legacy) + const queueCurrentDequeuedKey = this.keys.queueCurrentDequeuedKeyFromQueue(decoded.queueKey); + const envCurrentDequeuedKey = this.keys.envCurrentDequeuedKeyFromQueue(decoded.queueKey); + await this.redis.sadd(queueCurrentDequeuedKey, message.runId); + await this.redis.sadd(envCurrentDequeuedKey, message.runId); + + span.setAttribute("result", "SUCCESS"); + span.setAttribute("messageId", message.runId); + span.setAttribute("format", "v3"); + + return message; + } + + // Legacy format: workerQueueEntry is the message key path + const messageKey = workerQueueEntry; const rawMessage = await this.redis.dequeueMessageFromKey( messageKey, this.options.redis.keyPrefix ?? "" @@ -2225,7 +2395,6 @@ export class RunQueue { if (!rawMessage) { span.setAttribute("result", "NO_MESSAGE"); - return; } @@ -2243,6 +2412,7 @@ export class RunQueue { if (message) { span.setAttribute("result", "SUCCESS"); span.setAttribute("messageId", message.runId); + span.setAttribute("format", "legacy"); } else { span.setAttribute("result", "NO_MESSAGE"); } @@ -2318,6 +2488,48 @@ redis.call('SREM', envCurrentDequeuedKey, messageId) `, }); + // V3 optimized enqueue: no message key, encoded member in sorted set + this.redis.defineCommand("enqueueMessageV3", { + numberOfKeys: 7, + lua: ` +local masterQueueKey = KEYS[1] +local queueKey = KEYS[2] +local queueCurrentConcurrencyKey = KEYS[3] +local envCurrentConcurrencyKey = KEYS[4] +local queueCurrentDequeuedKey = KEYS[5] +local envCurrentDequeuedKey = KEYS[6] +local envQueueKey = KEYS[7] + +local queueName = ARGV[1] +local encodedMember = ARGV[2] +local messageId = ARGV[3] +local messageScore = ARGV[4] + +-- NO message key write - data is encoded in the sorted set member + +-- Add the encoded member to the queue (contains runId|workerQueue|attempt|envType) +redis.call('ZADD', queueKey, messageScore, encodedMember) + +-- Add the messageId to the env queue (for counting, using runId for consistency) +redis.call('ZADD', envQueueKey, messageScore, messageId) + +-- Rebalance the parent queues +local earliestMessage = redis.call('ZRANGE', queueKey, 0, 0, 'WITHSCORES') + +if #earliestMessage == 0 then + redis.call('ZREM', masterQueueKey, queueName) +else + redis.call('ZADD', masterQueueKey, earliestMessage[2], queueName) +end + +-- Update the concurrency keys (clear any existing entries for this run) +redis.call('SREM', queueCurrentConcurrencyKey, messageId) +redis.call('SREM', envCurrentConcurrencyKey, messageId) +redis.call('SREM', queueCurrentDequeuedKey, messageId) +redis.call('SREM', envCurrentDequeuedKey, messageId) + `, + }); + this.redis.defineCommand("dequeueMessagesFromQueue", { numberOfKeys: 9, lua: ` @@ -2338,6 +2550,23 @@ local defaultEnvConcurrencyBurstFactor = ARGV[4] local keyPrefix = ARGV[5] local maxCount = tonumber(ARGV[6] or '1') +-- V3 format detection helper: ASCII Record Separator delimiter +local DELIMITER = "\\x1e" + +-- Check if member is V3 encoded format (contains delimiter) +local function isV3Encoded(member) + return string.find(member, DELIMITER, 1, true) ~= nil +end + +-- Extract runId from V3 encoded member (first field before delimiter) +local function getRunIdFromV3(member) + local delimPos = string.find(member, DELIMITER, 1, true) + if delimPos then + return string.sub(member, 1, delimPos - 1) + end + return member +end + -- Check current env concurrency against the limit local envCurrentConcurrency = tonumber(redis.call('SCARD', envCurrentConcurrencyKey) or '0') local envConcurrencyLimit = tonumber(redis.call('GET', envConcurrencyLimitKey) or defaultEnvConcurrencyLimit) @@ -2377,27 +2606,38 @@ end local results = {} local dequeuedCount = 0 --- Process messages in pairs (messageId, score) +-- Process messages in pairs (member, score) for i = 1, #messages, 2 do - local messageId = messages[i] + local member = messages[i] local messageScore = tonumber(messages[i + 1]) - - -- Get the message payload - local messageKey = messageKeyPrefix .. messageId - local messagePayload = redis.call('GET', messageKey) - + local runId + local messagePayload + + if isV3Encoded(member) then + -- V3 format: member contains encoded data (runId|workerQueue|attempt|envType) + runId = getRunIdFromV3(member) + -- For V3, the "payload" is the encoded member itself - TypeScript will reconstruct + messagePayload = member + else + -- Legacy format: member is just runId, fetch from message key + runId = member + local messageKey = messageKeyPrefix .. runId + messagePayload = redis.call('GET', messageKey) + end + if messagePayload then - -- Update concurrency - redis.call('ZREM', queueKey, messageId) - redis.call('ZREM', envQueueKey, messageId) - redis.call('SADD', queueCurrentConcurrencyKey, messageId) - redis.call('SADD', envCurrentConcurrencyKey, messageId) - - -- Add to results - table.insert(results, messageId) + -- Update concurrency using runId (not the full encoded member) + redis.call('ZREM', queueKey, member) + redis.call('ZREM', envQueueKey, runId) + redis.call('SADD', queueCurrentConcurrencyKey, runId) + redis.call('SADD', envCurrentConcurrencyKey, runId) + + -- Add to results: [runId, score, payload, ...] + -- For V3, payload is the encoded member; for legacy, it's JSON + table.insert(results, runId) table.insert(results, messageScore) table.insert(results, messagePayload) - + dequeuedCount = dequeuedCount + 1 end end @@ -2411,7 +2651,8 @@ else redis.call('ZADD', masterQueueKey, earliestMessage[2], queueName) end --- Return results as a flat array: [messageId1, messageScore1, messagePayload1, messageId2, messageScore2, messagePayload2, ...] +-- Return results as a flat array: [runId1, messageScore1, messagePayload1, runId2, messageScore2, messagePayload2, ...] +-- For V3, messagePayload is the encoded member string; for legacy, it's JSON return results `, }); @@ -2548,6 +2789,48 @@ redis.call('SREM', envCurrentDequeuedKey, messageId) redis.call('ZADD', messageQueueKey, messageScore, messageId) redis.call('ZADD', envQueueKey, messageScore, messageId) +-- Rebalance the parent queues +local earliestMessage = redis.call('ZRANGE', messageQueueKey, 0, 0, 'WITHSCORES') +if #earliestMessage == 0 then + redis.call('ZREM', masterQueueKey, messageQueueName) +else + redis.call('ZADD', masterQueueKey, earliestMessage[2], messageQueueName) +end +`, + }); + + // V3 optimized nack: no message key, encoded member in sorted set + this.redis.defineCommand("nackMessageV3", { + numberOfKeys: 7, + lua: ` +-- Keys: +local masterQueueKey = KEYS[1] +local messageQueueKey = KEYS[2] +local queueCurrentConcurrencyKey = KEYS[3] +local envCurrentConcurrencyKey = KEYS[4] +local queueCurrentDequeuedKey = KEYS[5] +local envCurrentDequeuedKey = KEYS[6] +local envQueueKey = KEYS[7] + +-- Args: +local encodedMember = ARGV[1] +local messageId = ARGV[2] +local messageQueueName = ARGV[3] +local messageScore = tonumber(ARGV[4]) + +-- NO message key write - data is encoded in the sorted set member + +-- Update the concurrency keys (use runId/messageId) +redis.call('SREM', queueCurrentConcurrencyKey, messageId) +redis.call('SREM', envCurrentConcurrencyKey, messageId) +redis.call('SREM', queueCurrentDequeuedKey, messageId) +redis.call('SREM', envCurrentDequeuedKey, messageId) + +-- Enqueue the encoded member into the queue +redis.call('ZADD', messageQueueKey, messageScore, encodedMember) +-- For env queue, use messageId for consistency with counting +redis.call('ZADD', envQueueKey, messageScore, messageId) + -- Rebalance the parent queues local earliestMessage = redis.call('ZRANGE', messageQueueKey, 0, 0, 'WITHSCORES') if #earliestMessage == 0 then @@ -2878,6 +3161,41 @@ declare module "@internal/redis" { maxCount: string, callback?: Callback ): Result; + + // V3 optimized commands (no message key) + enqueueMessageV3( + //keys + masterQueueKey: string, + queue: string, + queueCurrentConcurrencyKey: string, + envCurrentConcurrencyKey: string, + queueCurrentDequeuedKey: string, + envCurrentDequeuedKey: string, + envQueueKey: string, + //args + queueName: string, + encodedMember: string, + messageId: string, + messageScore: string, + callback?: Callback + ): Result; + + nackMessageV3( + // keys + masterQueueKey: string, + messageQueue: string, + queueCurrentConcurrencyKey: string, + envCurrentConcurrencyKey: string, + queueCurrentDequeuedKey: string, + envCurrentDequeuedKey: string, + envQueueKey: string, + // args + encodedMember: string, + messageId: string, + messageQueueName: string, + messageScore: string, + callback?: Callback + ): Result; } } diff --git a/internal-packages/run-engine/src/run-queue/messageEncoding.ts b/internal-packages/run-engine/src/run-queue/messageEncoding.ts new file mode 100644 index 0000000000..663ab55c58 --- /dev/null +++ b/internal-packages/run-engine/src/run-queue/messageEncoding.ts @@ -0,0 +1,281 @@ +import { RuntimeEnvironmentType } from "@trigger.dev/database"; +import type { OutputPayload, OutputPayloadV2, QueueDescriptor } from "./types.js"; + +/** + * Message encoding for optimized Redis storage. + * + * This module provides encoding/decoding for the "v3" message format that eliminates + * the need for separate message keys in Redis, reducing storage by ~80% for pending messages. + * + * ## Migration Strategy + * - New messages are written in v3 format (no message key created) + * - Old messages (v1/v2) continue to work via message key lookup + * - Detection is automatic based on format + * - Old messages drain naturally as they're processed + * - No backfill required + * + * ## Format Detection + * - Sorted set member with DELIMITER = v3 format + * - Sorted set member without DELIMITER = legacy format (needs message key lookup) + * - Worker queue entry starting with "{org:" = legacy format + * - Worker queue entry with DELIMITER = v3 format + */ + +// ASCII Record Separator - won't appear in IDs, queue names, or other fields +const DELIMITER = "\x1e"; + +// Environment type single-char encoding for compact storage +const ENV_TYPE_TO_CHAR: Record = { + DEVELOPMENT: "D", + STAGING: "S", + PREVIEW: "V", + PRODUCTION: "P", +}; + +const CHAR_TO_ENV_TYPE: Record = { + D: "DEVELOPMENT", + S: "STAGING", + V: "PREVIEW", + P: "PRODUCTION", +}; + +/** + * Data encoded in sorted set member for v3 format. + * Everything else is derived from the queue key. + */ +export interface EncodedQueueMember { + runId: string; + workerQueue: string; + attempt: number; + environmentType: RuntimeEnvironmentType; +} + +/** + * Data encoded in worker queue entry for v3 format. + * Includes queue key and timestamp to fully reconstruct the message. + */ +export interface EncodedWorkerQueueEntry extends EncodedQueueMember { + queueKey: string; + timestamp: number; +} + +/** + * Check if a sorted set member is in v3 encoded format. + */ +export function isEncodedQueueMember(member: string): boolean { + return member.includes(DELIMITER); +} + +/** + * Check if a worker queue entry is in v3 encoded format. + * Legacy format starts with "{org:" (message key path). + */ +export function isEncodedWorkerQueueEntry(entry: string): boolean { + return !entry.startsWith("{org:") && entry.includes(DELIMITER); +} + +/** + * Encode message data for sorted set member. + * Format: runId␞workerQueue␞attempt␞envTypeChar + */ +export function encodeQueueMember(data: EncodedQueueMember): string { + const envChar = ENV_TYPE_TO_CHAR[data.environmentType]; + return [data.runId, data.workerQueue, data.attempt.toString(), envChar].join(DELIMITER); +} + +/** + * Decode sorted set member to message data. + * Returns undefined if not in v3 format. + */ +export function decodeQueueMember(member: string): EncodedQueueMember | undefined { + if (!isEncodedQueueMember(member)) { + return undefined; + } + + const parts = member.split(DELIMITER); + if (parts.length !== 4) { + return undefined; + } + + const [runId, workerQueue, attemptStr, envChar] = parts; + const environmentType = CHAR_TO_ENV_TYPE[envChar]; + + if (!environmentType) { + return undefined; + } + + return { + runId, + workerQueue, + attempt: parseInt(attemptStr, 10), + environmentType, + }; +} + +/** + * Encode message data for worker queue entry. + * Format: runId␞workerQueue␞attempt␞envTypeChar␞queueKey␞timestamp + */ +export function encodeWorkerQueueEntry(data: EncodedWorkerQueueEntry): string { + const envChar = ENV_TYPE_TO_CHAR[data.environmentType]; + return [ + data.runId, + data.workerQueue, + data.attempt.toString(), + envChar, + data.queueKey, + data.timestamp.toString(), + ].join(DELIMITER); +} + +/** + * Decode worker queue entry to message data. + * Returns undefined if not in v3 format. + */ +export function decodeWorkerQueueEntry(entry: string): EncodedWorkerQueueEntry | undefined { + if (!isEncodedWorkerQueueEntry(entry)) { + return undefined; + } + + const parts = entry.split(DELIMITER); + if (parts.length !== 6) { + return undefined; + } + + const [runId, workerQueue, attemptStr, envChar, queueKey, timestampStr] = parts; + const environmentType = CHAR_TO_ENV_TYPE[envChar]; + + if (!environmentType) { + return undefined; + } + + return { + runId, + workerQueue, + attempt: parseInt(attemptStr, 10), + environmentType, + queueKey, + timestamp: parseInt(timestampStr, 10), + }; +} + +/** + * Reconstruct full OutputPayloadV2 from encoded worker queue entry and queue descriptor. + */ +export function reconstructMessageFromWorkerEntry( + entry: EncodedWorkerQueueEntry, + descriptor: QueueDescriptor +): OutputPayloadV2 { + return { + version: "2", + runId: entry.runId, + taskIdentifier: descriptor.queue, + orgId: descriptor.orgId, + projectId: descriptor.projectId, + environmentId: descriptor.envId, + environmentType: entry.environmentType, + queue: entry.queueKey, + concurrencyKey: descriptor.concurrencyKey, + timestamp: entry.timestamp, + attempt: entry.attempt, + workerQueue: entry.workerQueue, + }; +} + +/** + * Extract runId from either v3 encoded member or legacy member. + * Legacy members are just the runId itself. + */ +export function getRunIdFromMember(member: string): string { + if (isEncodedQueueMember(member)) { + const decoded = decodeQueueMember(member); + return decoded?.runId ?? member; + } + return member; +} + +/** + * Lua helper functions to be included in Redis scripts. + * These handle format detection and parsing within Lua. + */ +export const LUA_ENCODING_HELPERS = ` +-- Delimiter for v3 encoded format (ASCII Record Separator) +local DELIMITER = "\\x1e" + +-- Check if a string is v3 encoded (contains delimiter) +local function isV3Encoded(str) + return string.find(str, DELIMITER, 1, true) ~= nil +end + +-- Check if worker queue entry is legacy format (starts with {org:) +local function isLegacyWorkerEntry(entry) + return string.sub(entry, 1, 5) == "{org:" +end + +-- Encode queue member: runId, workerQueue, attempt, envTypeChar +local function encodeQueueMember(runId, workerQueue, attempt, envTypeChar) + return runId .. DELIMITER .. workerQueue .. DELIMITER .. tostring(attempt) .. DELIMITER .. envTypeChar +end + +-- Decode queue member, returns: runId, workerQueue, attempt, envTypeChar (or nil if not v3) +local function decodeQueueMember(member) + if not isV3Encoded(member) then + return nil + end + local parts = {} + for part in string.gmatch(member, "([^" .. DELIMITER .. "]+)") do + table.insert(parts, part) + end + if #parts ~= 4 then + return nil + end + return parts[1], parts[2], tonumber(parts[3]), parts[4] +end + +-- Encode worker queue entry: runId, workerQueue, attempt, envTypeChar, queueKey, timestamp +local function encodeWorkerEntry(runId, workerQueue, attempt, envTypeChar, queueKey, timestamp) + return runId .. DELIMITER .. workerQueue .. DELIMITER .. tostring(attempt) .. DELIMITER .. envTypeChar .. DELIMITER .. queueKey .. DELIMITER .. tostring(timestamp) +end + +-- Decode worker queue entry, returns: runId, workerQueue, attempt, envTypeChar, queueKey, timestamp (or nil if not v3) +local function decodeWorkerEntry(entry) + if isLegacyWorkerEntry(entry) then + return nil + end + if not isV3Encoded(entry) then + return nil + end + local parts = {} + for part in string.gmatch(entry, "([^" .. DELIMITER .. "]+)") do + table.insert(parts, part) + end + if #parts ~= 6 then + return nil + end + return parts[1], parts[2], tonumber(parts[3]), parts[4], parts[5], tonumber(parts[6]) +end + +-- Get runId from member (works for both v3 and legacy) +local function getRunIdFromMember(member) + if isV3Encoded(member) then + local runId = decodeQueueMember(member) + return runId or member + end + return member +end + +-- Environment type char mappings +local envTypeToChar = { + DEVELOPMENT = "D", + STAGING = "S", + PREVIEW = "V", + PRODUCTION = "P" +} + +local charToEnvType = { + D = "DEVELOPMENT", + S = "STAGING", + V = "PREVIEW", + P = "PRODUCTION" +} +`; diff --git a/internal-packages/run-engine/src/run-queue/tests/messageEncoding.test.ts b/internal-packages/run-engine/src/run-queue/tests/messageEncoding.test.ts new file mode 100644 index 0000000000..e1142048df --- /dev/null +++ b/internal-packages/run-engine/src/run-queue/tests/messageEncoding.test.ts @@ -0,0 +1,256 @@ +import { describe, expect, test } from "vitest"; +import { + encodeQueueMember, + decodeQueueMember, + encodeWorkerQueueEntry, + decodeWorkerQueueEntry, + isEncodedQueueMember, + isEncodedWorkerQueueEntry, + getRunIdFromMember, + reconstructMessageFromWorkerEntry, +} from "../messageEncoding.js"; + +describe("messageEncoding", () => { + describe("isEncodedQueueMember", () => { + test("returns true for v3 encoded member", () => { + const encoded = encodeQueueMember({ + runId: "run_abc123", + workerQueue: "env_xyz", + attempt: 0, + environmentType: "PRODUCTION", + }); + expect(isEncodedQueueMember(encoded)).toBe(true); + }); + + test("returns false for legacy member (just runId)", () => { + expect(isEncodedQueueMember("run_abc123")).toBe(false); + }); + }); + + describe("isEncodedWorkerQueueEntry", () => { + test("returns true for v3 encoded entry", () => { + const encoded = encodeWorkerQueueEntry({ + runId: "run_abc123", + workerQueue: "env_xyz", + attempt: 0, + environmentType: "PRODUCTION", + queueKey: "{org:o123}:proj:p123:env:e123:queue:my-task", + timestamp: 1706812800000, + }); + expect(isEncodedWorkerQueueEntry(encoded)).toBe(true); + }); + + test("returns false for legacy message key path", () => { + expect(isEncodedWorkerQueueEntry("{org:o123}:message:run_abc123")).toBe(false); + }); + }); + + describe("encodeQueueMember / decodeQueueMember", () => { + test("roundtrips correctly for PRODUCTION", () => { + const original = { + runId: "run_abc123xyz", + workerQueue: "env_def456", + attempt: 0, + environmentType: "PRODUCTION" as const, + }; + const encoded = encodeQueueMember(original); + const decoded = decodeQueueMember(encoded); + + expect(decoded).toEqual(original); + }); + + test("roundtrips correctly for DEVELOPMENT", () => { + const original = { + runId: "run_test", + workerQueue: "env_dev", + attempt: 3, + environmentType: "DEVELOPMENT" as const, + }; + const encoded = encodeQueueMember(original); + const decoded = decodeQueueMember(encoded); + + expect(decoded).toEqual(original); + }); + + test("roundtrips correctly for STAGING", () => { + const original = { + runId: "run_staging", + workerQueue: "env_stage", + attempt: 1, + environmentType: "STAGING" as const, + }; + const encoded = encodeQueueMember(original); + const decoded = decodeQueueMember(encoded); + + expect(decoded).toEqual(original); + }); + + test("roundtrips correctly for PREVIEW", () => { + const original = { + runId: "run_preview", + workerQueue: "env_preview", + attempt: 5, + environmentType: "PREVIEW" as const, + }; + const encoded = encodeQueueMember(original); + const decoded = decodeQueueMember(encoded); + + expect(decoded).toEqual(original); + }); + + test("decodeQueueMember returns undefined for legacy format", () => { + expect(decodeQueueMember("run_abc123")).toBeUndefined(); + }); + + test("decodeQueueMember returns undefined for malformed v3", () => { + // Only 3 parts instead of 4 + expect(decodeQueueMember("run_abc\x1eenv_xyz\x1e0")).toBeUndefined(); + }); + }); + + describe("encodeWorkerQueueEntry / decodeWorkerQueueEntry", () => { + test("roundtrips correctly", () => { + const original = { + runId: "run_abc123xyz", + workerQueue: "env_def456", + attempt: 2, + environmentType: "PRODUCTION" as const, + queueKey: "{org:org123}:proj:proj456:env:env789:queue:my-task", + timestamp: 1706812800000, + }; + const encoded = encodeWorkerQueueEntry(original); + const decoded = decodeWorkerQueueEntry(encoded); + + expect(decoded).toEqual(original); + }); + + test("roundtrips correctly with concurrency key in queue", () => { + const original = { + runId: "run_xyz", + workerQueue: "env_abc", + attempt: 0, + environmentType: "DEVELOPMENT" as const, + queueKey: "{org:o1}:proj:p1:env:e1:queue:task:ck:user-123", + timestamp: 1706812800000, + }; + const encoded = encodeWorkerQueueEntry(original); + const decoded = decodeWorkerQueueEntry(encoded); + + expect(decoded).toEqual(original); + }); + + test("decodeWorkerQueueEntry returns undefined for legacy message key", () => { + expect(decodeWorkerQueueEntry("{org:o123}:message:run_abc")).toBeUndefined(); + }); + }); + + describe("getRunIdFromMember", () => { + test("extracts runId from v3 encoded member", () => { + const encoded = encodeQueueMember({ + runId: "run_abc123", + workerQueue: "env_xyz", + attempt: 0, + environmentType: "PRODUCTION", + }); + expect(getRunIdFromMember(encoded)).toBe("run_abc123"); + }); + + test("returns legacy member as-is (it is the runId)", () => { + expect(getRunIdFromMember("run_abc123")).toBe("run_abc123"); + }); + }); + + describe("reconstructMessageFromWorkerEntry", () => { + test("reconstructs full message payload", () => { + const entry = { + runId: "run_abc123", + workerQueue: "env_xyz", + attempt: 1, + environmentType: "PRODUCTION" as const, + queueKey: "{org:org123}:proj:proj456:env:env789:queue:my-task", + timestamp: 1706812800000, + }; + const descriptor = { + orgId: "org123", + projectId: "proj456", + envId: "env789", + queue: "my-task", + concurrencyKey: undefined, + }; + + const message = reconstructMessageFromWorkerEntry(entry, descriptor); + + expect(message).toEqual({ + version: "2", + runId: "run_abc123", + taskIdentifier: "my-task", + orgId: "org123", + projectId: "proj456", + environmentId: "env789", + environmentType: "PRODUCTION", + queue: "{org:org123}:proj:proj456:env:env789:queue:my-task", + concurrencyKey: undefined, + timestamp: 1706812800000, + attempt: 1, + workerQueue: "env_xyz", + }); + }); + + test("reconstructs message with concurrency key", () => { + const entry = { + runId: "run_xyz", + workerQueue: "env_dev", + attempt: 0, + environmentType: "DEVELOPMENT" as const, + queueKey: "{org:o1}:proj:p1:env:e1:queue:task:ck:user-42", + timestamp: 1706812800000, + }; + const descriptor = { + orgId: "o1", + projectId: "p1", + envId: "e1", + queue: "task", + concurrencyKey: "user-42", + }; + + const message = reconstructMessageFromWorkerEntry(entry, descriptor); + + expect(message.concurrencyKey).toBe("user-42"); + expect(message.queue).toBe("{org:o1}:proj:p1:env:e1:queue:task:ck:user-42"); + }); + }); + + describe("encoded size comparison", () => { + test("v3 format is significantly smaller than full JSON", () => { + const fullPayload = JSON.stringify({ + version: "2", + runId: "run_clxyz123abc456def789", + taskIdentifier: "my-background-task", + orgId: "org_clxyz123abc456def789", + projectId: "proj_clxyz123abc456def789", + environmentId: "env_clxyz123abc456def789", + environmentType: "PRODUCTION", + queue: "{org:org_clxyz123abc456def789}:proj:proj_clxyz123abc456def789:env:env_clxyz123abc456def789:queue:my-background-task", + concurrencyKey: undefined, + timestamp: 1706812800000, + attempt: 0, + workerQueue: "env_clxyz123abc456def789", + }); + + const v3Encoded = encodeQueueMember({ + runId: "run_clxyz123abc456def789", + workerQueue: "env_clxyz123abc456def789", + attempt: 0, + environmentType: "PRODUCTION", + }); + + // Full JSON is typically 400-600 bytes + // v3 encoded queue member should be ~70-80 bytes + expect(v3Encoded.length).toBeLessThan(fullPayload.length * 0.2); + + console.log(`Full JSON size: ${fullPayload.length} bytes`); + console.log(`V3 encoded size: ${v3Encoded.length} bytes`); + console.log(`Reduction: ${((1 - v3Encoded.length / fullPayload.length) * 100).toFixed(1)}%`); + }); + }); +}); From 7afe9ed86583ec25fea9b5b8112c859bf5f6f402 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 1 Feb 2026 11:45:49 +0000 Subject: [PATCH 2/6] test(run-engine): add integration tests for V2/V3 message format handling Add comprehensive tests covering: - V2 (legacy) format enqueue/dequeue/ack/nack - V3 (optimized) format enqueue/dequeue/ack/nack - Mixed format migration scenarios - Format detection helpers https://claude.ai/code/session_01AyzQp6tbj7th5QRTCYjJR5 --- .../src/run-queue/tests/messageFormat.test.ts | 655 ++++++++++++++++++ 1 file changed, 655 insertions(+) create mode 100644 internal-packages/run-engine/src/run-queue/tests/messageFormat.test.ts diff --git a/internal-packages/run-engine/src/run-queue/tests/messageFormat.test.ts b/internal-packages/run-engine/src/run-queue/tests/messageFormat.test.ts new file mode 100644 index 0000000000..058b8eb321 --- /dev/null +++ b/internal-packages/run-engine/src/run-queue/tests/messageFormat.test.ts @@ -0,0 +1,655 @@ +import { redisTest } from "@internal/testcontainers"; +import { trace } from "@internal/tracing"; +import { Logger } from "@trigger.dev/core/logger"; +import { setTimeout } from "node:timers/promises"; +import { FairQueueSelectionStrategy } from "../fairQueueSelectionStrategy.js"; +import { RunQueue } from "../index.js"; +import { RunQueueFullKeyProducer } from "../keyProducer.js"; +import { InputPayload } from "../types.js"; +import { + encodeQueueMember, + encodeWorkerQueueEntry, + decodeQueueMember, + decodeWorkerQueueEntry, + isEncodedQueueMember, + isEncodedWorkerQueueEntry, +} from "../messageEncoding.js"; + +// Mock Decimal since we can't import from @trigger.dev/database without Prisma +const createDecimal = (value: number) => ({ + toNumber: () => value, + toString: () => value.toString(), +}); + +const testOptions = { + name: "rq", + tracer: trace.getTracer("rq"), + workers: 1, + defaultEnvConcurrency: 25, + logger: new Logger("RunQueue", "warn"), + retryOptions: { + maxAttempts: 5, + factor: 1.1, + minTimeoutInMs: 100, + maxTimeoutInMs: 1_000, + randomize: true, + }, + keys: new RunQueueFullKeyProducer(), +}; + +const createEnv = (id: string, type: "DEVELOPMENT" | "PRODUCTION" = "DEVELOPMENT") => ({ + id, + type, + maximumConcurrencyLimit: 10, + concurrencyLimitBurstFactor: createDecimal(1.0), + project: { id: "p1234" }, + organization: { id: "o1234" }, +}); + +const createMessage = ( + runId: string, + timestamp: number = Date.now() +): InputPayload => ({ + runId, + taskIdentifier: "task/my-task", + orgId: "o1234", + projectId: "p1234", + environmentId: "e1234", + environmentType: "DEVELOPMENT", + queue: "task/my-task", + timestamp, + attempt: 0, +}); + +vi.setConfig({ testTimeout: 60_000 }); + +describe("Message Format Handling", () => { + describe("V2 (Legacy) Format", () => { + redisTest( + "should enqueue and dequeue messages in V2 format when useOptimizedMessageFormat is false", + async ({ redisContainer }) => { + const env = createEnv("e1234"); + const queue = new RunQueue({ + ...testOptions, + useOptimizedMessageFormat: false, // V2 legacy format + queueSelectionStrategy: new FairQueueSelectionStrategy({ + redis: { + keyPrefix: "runqueue:v2test:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + keys: testOptions.keys, + }), + redis: { + keyPrefix: "runqueue:v2test:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + }); + + try { + const message = createMessage("run_v2_test_1"); + + // Enqueue message + await queue.enqueueMessage({ + env: env as any, + message, + workerQueue: env.id, + }); + + // Verify message key was created (V2 format) + const messageExists = await queue.messageExists(message.orgId, message.runId); + expect(messageExists).toBe(1); // Message key should exist in V2 format + + // Verify queue length + expect(await queue.lengthOfQueue(env as any, message.queue)).toBe(1); + + // Wait for worker to process + await setTimeout(600); + + // Dequeue and verify + const dequeued = await queue.dequeueMessageFromWorkerQueue("test_consumer", env.id); + expect(dequeued).toBeDefined(); + expect(dequeued?.messageId).toBe(message.runId); + expect(dequeued?.message.version).toBe("2"); + expect(dequeued?.message.runId).toBe(message.runId); + } finally { + await queue.quit(); + } + } + ); + + redisTest( + "should read message from message key in V2 format", + async ({ redisContainer }) => { + const env = createEnv("e1234"); + const queue = new RunQueue({ + ...testOptions, + useOptimizedMessageFormat: false, + queueSelectionStrategy: new FairQueueSelectionStrategy({ + redis: { + keyPrefix: "runqueue:v2read:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + keys: testOptions.keys, + }), + redis: { + keyPrefix: "runqueue:v2read:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + }); + + try { + const message = createMessage("run_v2_read_test"); + + await queue.enqueueMessage({ + env: env as any, + message, + workerQueue: env.id, + }); + + // Read message directly + const readMessage = await queue.readMessage(message.orgId, message.runId); + expect(readMessage).toBeDefined(); + expect(readMessage?.runId).toBe(message.runId); + expect(readMessage?.taskIdentifier).toBe(message.taskIdentifier); + } finally { + await queue.quit(); + } + } + ); + + redisTest( + "should acknowledge V2 message and delete message key", + async ({ redisContainer }) => { + const env = createEnv("e1234"); + const queue = new RunQueue({ + ...testOptions, + useOptimizedMessageFormat: false, + queueSelectionStrategy: new FairQueueSelectionStrategy({ + redis: { + keyPrefix: "runqueue:v2ack:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + keys: testOptions.keys, + }), + redis: { + keyPrefix: "runqueue:v2ack:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + }); + + try { + const message = createMessage("run_v2_ack_test"); + + await queue.enqueueMessage({ + env: env as any, + message, + workerQueue: env.id, + }); + + // Wait for processing + await setTimeout(600); + + // Dequeue + const dequeued = await queue.dequeueMessageFromWorkerQueue("test_consumer", env.id); + expect(dequeued).toBeDefined(); + + // Acknowledge + await queue.acknowledgeMessage(message.orgId, message.runId); + + // Verify message key is deleted + const messageExists = await queue.messageExists(message.orgId, message.runId); + expect(messageExists).toBe(0); + + // Verify queue is empty + expect(await queue.lengthOfQueue(env as any, message.queue)).toBe(0); + } finally { + await queue.quit(); + } + } + ); + + redisTest( + "should nack V2 message and update message key with incremented attempt", + async ({ redisContainer }) => { + const env = createEnv("e1234"); + const queue = new RunQueue({ + ...testOptions, + useOptimizedMessageFormat: false, + queueSelectionStrategy: new FairQueueSelectionStrategy({ + redis: { + keyPrefix: "runqueue:v2nack:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + keys: testOptions.keys, + }), + redis: { + keyPrefix: "runqueue:v2nack:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + }); + + try { + const message = createMessage("run_v2_nack_test"); + + await queue.enqueueMessage({ + env: env as any, + message, + workerQueue: env.id, + }); + + await setTimeout(600); + + // Dequeue + const dequeued = await queue.dequeueMessageFromWorkerQueue("test_consumer", env.id); + expect(dequeued).toBeDefined(); + expect(dequeued?.message.attempt).toBe(0); + + // Nack (will increment attempt and requeue) + const nackResult = await queue.nackMessage({ + orgId: message.orgId, + messageId: message.runId, + retryAt: Date.now(), // Retry immediately + }); + expect(nackResult).toBe(true); + + // Verify message is back in queue + expect(await queue.lengthOfQueue(env as any, message.queue)).toBe(1); + + // Read message and verify attempt was incremented + const updatedMessage = await queue.readMessage(message.orgId, message.runId); + expect(updatedMessage?.attempt).toBe(1); + } finally { + await queue.quit(); + } + } + ); + }); + + describe("V3 (Optimized) Format", () => { + redisTest( + "should enqueue and dequeue messages in V3 format when useOptimizedMessageFormat is true", + async ({ redisContainer }) => { + const env = createEnv("e1234"); + const queue = new RunQueue({ + ...testOptions, + useOptimizedMessageFormat: true, // V3 optimized format + queueSelectionStrategy: new FairQueueSelectionStrategy({ + redis: { + keyPrefix: "runqueue:v3test:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + keys: testOptions.keys, + }), + redis: { + keyPrefix: "runqueue:v3test:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + }); + + try { + const message = createMessage("run_v3_test_1"); + + // Enqueue message + await queue.enqueueMessage({ + env: env as any, + message, + workerQueue: env.id, + }); + + // Verify NO message key was created (V3 format) + const messageExists = await queue.messageExists(message.orgId, message.runId); + expect(messageExists).toBe(0); // Message key should NOT exist in V3 format + + // Verify queue length + expect(await queue.lengthOfQueue(env as any, message.queue)).toBe(1); + + // Wait for worker to process + await setTimeout(600); + + // Dequeue and verify + const dequeued = await queue.dequeueMessageFromWorkerQueue("test_consumer", env.id); + expect(dequeued).toBeDefined(); + expect(dequeued?.messageId).toBe(message.runId); + expect(dequeued?.message.version).toBe("2"); // Still returns OutputPayloadV2 + expect(dequeued?.message.runId).toBe(message.runId); + expect(dequeued?.message.taskIdentifier).toBe(message.taskIdentifier); + } finally { + await queue.quit(); + } + } + ); + + redisTest( + "should acknowledge V3 message without error (no message key to delete)", + async ({ redisContainer }) => { + const env = createEnv("e1234"); + const queue = new RunQueue({ + ...testOptions, + useOptimizedMessageFormat: true, + queueSelectionStrategy: new FairQueueSelectionStrategy({ + redis: { + keyPrefix: "runqueue:v3ack:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + keys: testOptions.keys, + }), + redis: { + keyPrefix: "runqueue:v3ack:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + }); + + try { + const message = createMessage("run_v3_ack_test"); + + await queue.enqueueMessage({ + env: env as any, + message, + workerQueue: env.id, + }); + + await setTimeout(600); + + // Dequeue + const dequeued = await queue.dequeueMessageFromWorkerQueue("test_consumer", env.id); + expect(dequeued).toBeDefined(); + + // Acknowledge (should not error even though no message key exists) + await queue.acknowledgeMessage(message.orgId, message.runId); + + // Verify queue is empty + expect(await queue.lengthOfQueue(env as any, message.queue)).toBe(0); + } finally { + await queue.quit(); + } + } + ); + + redisTest( + "should nack V3 message and requeue with encoded format", + async ({ redisContainer }) => { + const env = createEnv("e1234"); + const queue = new RunQueue({ + ...testOptions, + useOptimizedMessageFormat: true, + queueSelectionStrategy: new FairQueueSelectionStrategy({ + redis: { + keyPrefix: "runqueue:v3nack:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + keys: testOptions.keys, + }), + redis: { + keyPrefix: "runqueue:v3nack:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + }); + + try { + const message = createMessage("run_v3_nack_test"); + + await queue.enqueueMessage({ + env: env as any, + message, + workerQueue: env.id, + }); + + await setTimeout(600); + + // Dequeue + const dequeued = await queue.dequeueMessageFromWorkerQueue("test_consumer", env.id); + expect(dequeued).toBeDefined(); + expect(dequeued?.message.attempt).toBe(0); + + // Nack + const nackResult = await queue.nackMessage({ + orgId: message.orgId, + messageId: message.runId, + retryAt: Date.now(), + }); + expect(nackResult).toBe(true); + + // Verify message is back in queue + expect(await queue.lengthOfQueue(env as any, message.queue)).toBe(1); + + // Wait and dequeue again to verify attempt was incremented + await setTimeout(600); + const dequeued2 = await queue.dequeueMessageFromWorkerQueue("test_consumer", env.id); + expect(dequeued2).toBeDefined(); + expect(dequeued2?.message.attempt).toBe(1); + } finally { + await queue.quit(); + } + } + ); + }); + + describe("Mixed Format Migration", () => { + redisTest( + "V3 queue should be able to read V2 messages during migration", + async ({ redisContainer }) => { + const env = createEnv("e1234"); + + // First, enqueue with V2 format + const queueV2 = new RunQueue({ + ...testOptions, + useOptimizedMessageFormat: false, // V2 + queueSelectionStrategy: new FairQueueSelectionStrategy({ + redis: { + keyPrefix: "runqueue:mixed:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + keys: testOptions.keys, + }), + redis: { + keyPrefix: "runqueue:mixed:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + }); + + const message = createMessage("run_mixed_v2_to_v3"); + + await queueV2.enqueueMessage({ + env: env as any, + message, + workerQueue: env.id, + }); + + // Verify V2 message key exists + expect(await queueV2.messageExists(message.orgId, message.runId)).toBe(1); + + await queueV2.quit(); + + // Now create V3 queue and try to read the V2 message + const queueV3 = new RunQueue({ + ...testOptions, + useOptimizedMessageFormat: true, // V3 + queueSelectionStrategy: new FairQueueSelectionStrategy({ + redis: { + keyPrefix: "runqueue:mixed:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + keys: testOptions.keys, + }), + redis: { + keyPrefix: "runqueue:mixed:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + }); + + try { + await setTimeout(600); + + // V3 queue should be able to dequeue V2 message + const dequeued = await queueV3.dequeueMessageFromWorkerQueue("test_consumer", env.id); + expect(dequeued).toBeDefined(); + expect(dequeued?.messageId).toBe(message.runId); + expect(dequeued?.message.runId).toBe(message.runId); + } finally { + await queueV3.quit(); + } + } + ); + + redisTest( + "should handle multiple messages with mixed formats", + async ({ redisContainer }) => { + const env = createEnv("e1234"); + + // Create V2 queue and enqueue some messages + const queueV2 = new RunQueue({ + ...testOptions, + useOptimizedMessageFormat: false, + queueSelectionStrategy: new FairQueueSelectionStrategy({ + redis: { + keyPrefix: "runqueue:multimixed:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + keys: testOptions.keys, + }), + redis: { + keyPrefix: "runqueue:multimixed:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + }); + + const messageV2_1 = createMessage("run_v2_1", Date.now() - 3000); + const messageV2_2 = createMessage("run_v2_2", Date.now() - 2000); + + await queueV2.enqueueMessage({ env: env as any, message: messageV2_1, workerQueue: env.id }); + await queueV2.enqueueMessage({ env: env as any, message: messageV2_2, workerQueue: env.id }); + + await queueV2.quit(); + + // Switch to V3 and add more messages + const queueV3 = new RunQueue({ + ...testOptions, + useOptimizedMessageFormat: true, + queueSelectionStrategy: new FairQueueSelectionStrategy({ + redis: { + keyPrefix: "runqueue:multimixed:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + keys: testOptions.keys, + }), + redis: { + keyPrefix: "runqueue:multimixed:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + }); + + try { + const messageV3_1 = createMessage("run_v3_1", Date.now() - 1000); + const messageV3_2 = createMessage("run_v3_2", Date.now()); + + await queueV3.enqueueMessage({ env: env as any, message: messageV3_1, workerQueue: env.id }); + await queueV3.enqueueMessage({ env: env as any, message: messageV3_2, workerQueue: env.id }); + + // Total should be 4 messages + expect(await queueV3.lengthOfQueue(env as any, messageV2_1.queue)).toBe(4); + + await setTimeout(600); + + // Dequeue all messages - should handle both formats + const dequeuedMessages: string[] = []; + for (let i = 0; i < 4; i++) { + const dequeued = await queueV3.dequeueMessageFromWorkerQueue("test_consumer", env.id); + if (dequeued) { + dequeuedMessages.push(dequeued.messageId); + } + } + + // All 4 messages should be dequeued successfully + expect(dequeuedMessages).toContain("run_v2_1"); + expect(dequeuedMessages).toContain("run_v2_2"); + expect(dequeuedMessages).toContain("run_v3_1"); + expect(dequeuedMessages).toContain("run_v3_2"); + } finally { + await queueV3.quit(); + } + } + ); + }); + + describe("Encoding Format Detection", () => { + test("isEncodedQueueMember correctly identifies V3 format", () => { + const v3Member = encodeQueueMember({ + runId: "run_test", + workerQueue: "env_test", + attempt: 0, + environmentType: "PRODUCTION", + }); + + expect(isEncodedQueueMember(v3Member)).toBe(true); + expect(isEncodedQueueMember("run_test")).toBe(false); // V2 is just runId + }); + + test("isEncodedWorkerQueueEntry correctly identifies formats", () => { + const v3Entry = encodeWorkerQueueEntry({ + runId: "run_test", + workerQueue: "env_test", + attempt: 0, + environmentType: "PRODUCTION", + queueKey: "{org:o1}:proj:p1:env:e1:queue:task", + timestamp: Date.now(), + }); + + const v2Entry = "{org:o1}:message:run_test"; + + expect(isEncodedWorkerQueueEntry(v3Entry)).toBe(true); + expect(isEncodedWorkerQueueEntry(v2Entry)).toBe(false); + }); + + test("decodeQueueMember extracts correct data", () => { + const original = { + runId: "run_abc123", + workerQueue: "env_xyz", + attempt: 3, + environmentType: "STAGING" as const, + }; + + const encoded = encodeQueueMember(original); + const decoded = decodeQueueMember(encoded); + + expect(decoded).toEqual(original); + }); + + test("decodeWorkerQueueEntry extracts correct data", () => { + const original = { + runId: "run_abc123", + workerQueue: "env_xyz", + attempt: 2, + environmentType: "DEVELOPMENT" as const, + queueKey: "{org:o1}:proj:p1:env:e1:queue:my-task", + timestamp: 1706812800000, + }; + + const encoded = encodeWorkerQueueEntry(original); + const decoded = decodeWorkerQueueEntry(encoded); + + expect(decoded).toEqual(original); + }); + }); +}); From 5ff7951e187d75e9413860a7809d569e45cdf027 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 1 Feb 2026 11:50:20 +0000 Subject: [PATCH 3/6] fix(run-engine): create message key when dequeuing V3 messages for execution For V3 optimized format, we now create the message key when the run is dequeued from the worker queue (ready to execute). This allows ack/nack/readMessage to work correctly. Storage savings come from not having message keys while messages are PENDING in the queue backlog - only executing runs have message keys. https://claude.ai/code/session_01AyzQp6tbj7th5QRTCYjJR5 --- internal-packages/run-engine/src/run-queue/index.ts | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/internal-packages/run-engine/src/run-queue/index.ts b/internal-packages/run-engine/src/run-queue/index.ts index a928503275..76421ba6b2 100644 --- a/internal-packages/run-engine/src/run-queue/index.ts +++ b/internal-packages/run-engine/src/run-queue/index.ts @@ -2373,6 +2373,12 @@ export class RunQueue { const descriptor = this.keys.descriptorFromQueue(decoded.queueKey); const message = reconstructMessageFromWorkerEntry(decoded, descriptor); + // For V3 format: create the message key now that the run is executing. + // This allows ack/nack to work (they read from message key). + // Storage savings come from not having message keys for PENDING runs (the backlog). + const messageKey = this.keys.messageKey(descriptor.orgId, message.runId); + await this.redis.set(messageKey, JSON.stringify(message)); + // Update the currentDequeued sets (this is done in the Lua script for legacy) const queueCurrentDequeuedKey = this.keys.queueCurrentDequeuedKeyFromQueue(decoded.queueKey); const envCurrentDequeuedKey = this.keys.envCurrentDequeuedKeyFromQueue(decoded.queueKey); From 400673d25b99a0a01e414d74b2dd15eabf787596 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 1 Feb 2026 11:56:23 +0000 Subject: [PATCH 4/6] feat(run-engine): add RunDataProvider for V3 format ack/nack operations Instead of creating message keys when dequeuing V3 messages, we now read run data from PostgreSQL via a RunDataProvider when needed for ack/nack operations. This approach: - Eliminates ALL message keys for V3 format (not just pending runs) - Uses PostgreSQL as the source of truth for run data - Falls back to Redis message key for legacy V2 messages - Adds RunDataProvider interface and RunData type The RunEngine creates a runDataProvider that queries the TaskRun table for queue, orgId, environmentId, etc. when readMessage is called and no Redis message key exists. https://claude.ai/code/session_01AyzQp6tbj7th5QRTCYjJR5 --- .../run-engine/src/engine/index.ts | 37 +++++++++++++ .../run-engine/src/run-queue/index.ts | 54 ++++++++++++++++--- .../run-engine/src/run-queue/types.ts | 28 ++++++++++ 3 files changed, 111 insertions(+), 8 deletions(-) diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 9e81c99132..8a4e04a181 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -182,6 +182,43 @@ export class RunEngine { processWorkerQueueDebounceMs: options.queue?.processWorkerQueueDebounceMs, dequeueBlockingTimeoutSeconds: options.queue?.dequeueBlockingTimeoutSeconds, meter: options.meter, + // Run data provider for V3 optimized format - reads from PostgreSQL when no Redis message key exists + runDataProvider: { + getRunData: async (runId: string) => { + const run = await this.prisma.taskRun.findUnique({ + where: { id: runId }, + select: { + queue: true, + organizationId: true, + projectId: true, + runtimeEnvironmentId: true, + environmentType: true, + concurrencyKey: true, + attemptNumber: true, + queueTimestamp: true, + workerQueue: true, + taskIdentifier: true, + }, + }); + + if (!run || !run.organizationId || !run.environmentType) { + return undefined; + } + + return { + queue: run.queue, + orgId: run.organizationId, + projectId: run.projectId, + environmentId: run.runtimeEnvironmentId, + environmentType: run.environmentType, + concurrencyKey: run.concurrencyKey ?? undefined, + attempt: run.attemptNumber ?? 0, + timestamp: run.queueTimestamp?.getTime() ?? Date.now(), + workerQueue: run.workerQueue, + taskIdentifier: run.taskIdentifier, + }; + }, + }, }); this.worker = new Worker({ diff --git a/internal-packages/run-engine/src/run-queue/index.ts b/internal-packages/run-engine/src/run-queue/index.ts index 76421ba6b2..7dd13d1b75 100644 --- a/internal-packages/run-engine/src/run-queue/index.ts +++ b/internal-packages/run-engine/src/run-queue/index.ts @@ -39,6 +39,7 @@ import { InputPayload, OutputPayload, OutputPayloadV2, + RunDataProvider, RunQueueKeyProducer, RunQueueKeyProducerEnvironment, RunQueueSelectionStrategy, @@ -110,6 +111,12 @@ export type RunQueueOptions = { * 3. Old messages drain naturally as they're processed */ useOptimizedMessageFormat?: boolean; + /** + * Provider for fetching run data from PostgreSQL. + * Required when using V3 optimized format for ack/nack operations. + * Falls back to Redis message key if not provided (legacy behavior). + */ + runDataProvider?: RunDataProvider; }; export interface ConcurrencySweeperCallback { @@ -194,9 +201,11 @@ export class RunQueue { private _meter: Meter; private _queueCooloffStates: Map = new Map(); private _useOptimizedMessageFormat: boolean; + private _runDataProvider?: RunDataProvider; constructor(public readonly options: RunQueueOptions) { this._useOptimizedMessageFormat = options.useOptimizedMessageFormat ?? false; + this._runDataProvider = options.runDataProvider; this.shardCount = options.shardCount ?? 2; this.retryOptions = options.retryOptions ?? defaultRetrySettings; this.redis = createRedisClient(options.redis, { @@ -575,8 +584,43 @@ export class RunQueue { return this.redis.exists(this.keys.messageKey(orgId, messageId)); } - public async readMessage(orgId: string, messageId: string) { - return this.readMessageFromKey(this.keys.messageKey(orgId, messageId)); + public async readMessage(orgId: string, messageId: string): Promise { + // First try to read from Redis (legacy V2 format) + const redisMessage = await this.readMessageFromKey(this.keys.messageKey(orgId, messageId)); + if (redisMessage) { + return redisMessage; + } + + // Fall back to runDataProvider (for V3 format where there's no message key) + if (this._runDataProvider) { + const runData = await this._runDataProvider.getRunData(messageId); + if (runData) { + // Convert RunData to OutputPayloadV2 + const queueKey = this.keys.queueKey( + runData.orgId, + runData.projectId, + runData.environmentId, + runData.taskIdentifier, + runData.concurrencyKey + ); + return { + version: "2" as const, + runId: messageId, + taskIdentifier: runData.taskIdentifier, + orgId: runData.orgId, + projectId: runData.projectId, + environmentId: runData.environmentId, + environmentType: runData.environmentType, + queue: queueKey, + concurrencyKey: runData.concurrencyKey, + timestamp: runData.timestamp, + attempt: runData.attempt, + workerQueue: runData.workerQueue, + }; + } + } + + return undefined; } public async readMessageFromKey(messageKey: string) { @@ -2373,12 +2417,6 @@ export class RunQueue { const descriptor = this.keys.descriptorFromQueue(decoded.queueKey); const message = reconstructMessageFromWorkerEntry(decoded, descriptor); - // For V3 format: create the message key now that the run is executing. - // This allows ack/nack to work (they read from message key). - // Storage savings come from not having message keys for PENDING runs (the backlog). - const messageKey = this.keys.messageKey(descriptor.orgId, message.runId); - await this.redis.set(messageKey, JSON.stringify(message)); - // Update the currentDequeued sets (this is done in the Lua script for legacy) const queueCurrentDequeuedKey = this.keys.queueCurrentDequeuedKeyFromQueue(decoded.queueKey); const envCurrentDequeuedKey = this.keys.envCurrentDequeuedKeyFromQueue(decoded.queueKey); diff --git a/internal-packages/run-engine/src/run-queue/types.ts b/internal-packages/run-engine/src/run-queue/types.ts index ee1ce41b79..903ae138e2 100644 --- a/internal-packages/run-engine/src/run-queue/types.ts +++ b/internal-packages/run-engine/src/run-queue/types.ts @@ -133,3 +133,31 @@ export interface RunQueueSelectionStrategy { consumerId: string ): Promise>; } + +/** + * Provider for fetching run data from a persistent store (e.g., PostgreSQL). + * Used for V3 optimized format where message data is not stored in Redis. + */ +export interface RunDataProvider { + /** + * Fetch run data for ack/nack operations. + * Returns undefined if the run is not found. + */ + getRunData(runId: string): Promise; +} + +/** + * Run data needed for queue operations (ack, nack, release concurrency). + */ +export type RunData = { + queue: string; + orgId: string; + projectId: string; + environmentId: string; + environmentType: RuntimeEnvironmentType; + concurrencyKey?: string; + attempt: number; + timestamp: number; + workerQueue: string; + taskIdentifier: string; +}; From 424a68ed2e616e0611ee145986a355a7417877ee Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 2 Feb 2026 15:49:15 +0000 Subject: [PATCH 5/6] feat(run-engine): wire up TTL system callback and compute ttlExpiresAt on enqueue - Add TTL system options to RunEngineOptions.queue for configuration - Add #ttlExpiredCallback method on RunEngine that calls ttlSystem.expireRun() - Pass TTL system options to RunQueue in RunEngine constructor - Compute ttlExpiresAt from run.ttl when enqueuing runs in EnqueueSystem - Add env vars for TTL system configuration (disabled, shard count, poll interval, batch size) - Configure TTL system in webapp's runEngine.server.ts The TTL system enables automatic expiration of runs that have been in the queue past their TTL deadline. When runs expire, the callback updates their status to EXPIRED in the database and emits appropriate events. https://claude.ai/code/session_01AyzQp6tbj7th5QRTCYjJR5 --- apps/webapp/app/env.server.ts | 6 +++ apps/webapp/app/v3/runEngine.server.ts | 6 +++ .../run-engine/src/engine/index.ts | 37 +++++++++++++++++++ .../src/engine/systems/enqueueSystem.ts | 11 ++++++ .../run-engine/src/engine/types.ts | 11 ++++++ 5 files changed, 71 insertions(+) diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index dcbcac079a..885232dd12 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -591,6 +591,12 @@ const EnvironmentSchema = z RUN_ENGINE_CONCURRENCY_SWEEPER_SCAN_JITTER_IN_MS: z.coerce.number().int().optional(), RUN_ENGINE_CONCURRENCY_SWEEPER_PROCESS_MARKED_JITTER_IN_MS: z.coerce.number().int().optional(), + // TTL System settings for automatic run expiration + RUN_ENGINE_TTL_SYSTEM_DISABLED: BoolEnv.default(false), + RUN_ENGINE_TTL_SYSTEM_SHARD_COUNT: z.coerce.number().int().optional(), + RUN_ENGINE_TTL_SYSTEM_POLL_INTERVAL_MS: z.coerce.number().int().default(1_000), + RUN_ENGINE_TTL_SYSTEM_BATCH_SIZE: z.coerce.number().int().default(100), + RUN_ENGINE_RUN_LOCK_DURATION: z.coerce.number().int().default(5000), RUN_ENGINE_RUN_LOCK_AUTOMATIC_EXTENSION_THRESHOLD: z.coerce.number().int().default(1000), RUN_ENGINE_RUN_LOCK_MAX_RETRIES: z.coerce.number().int().default(10), diff --git a/apps/webapp/app/v3/runEngine.server.ts b/apps/webapp/app/v3/runEngine.server.ts index efba5fbdb0..b0dc1e8d0d 100644 --- a/apps/webapp/app/v3/runEngine.server.ts +++ b/apps/webapp/app/v3/runEngine.server.ts @@ -80,6 +80,12 @@ function createRunEngine() { scanJitterInMs: env.RUN_ENGINE_CONCURRENCY_SWEEPER_SCAN_JITTER_IN_MS, processMarkedJitterInMs: env.RUN_ENGINE_CONCURRENCY_SWEEPER_PROCESS_MARKED_JITTER_IN_MS, }, + ttlSystem: { + disabled: env.RUN_ENGINE_TTL_SYSTEM_DISABLED, + shardCount: env.RUN_ENGINE_TTL_SYSTEM_SHARD_COUNT, + pollIntervalMs: env.RUN_ENGINE_TTL_SYSTEM_POLL_INTERVAL_MS, + batchSize: env.RUN_ENGINE_TTL_SYSTEM_BATCH_SIZE, + }, }, runLock: { redis: { diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 8a4e04a181..08f0d7849f 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -182,6 +182,14 @@ export class RunEngine { processWorkerQueueDebounceMs: options.queue?.processWorkerQueueDebounceMs, dequeueBlockingTimeoutSeconds: options.queue?.dequeueBlockingTimeoutSeconds, meter: options.meter, + ttlSystem: options.queue?.ttlSystem?.disabled + ? undefined + : { + shardCount: options.queue?.ttlSystem?.shardCount, + pollIntervalMs: options.queue?.ttlSystem?.pollIntervalMs, + batchSize: options.queue?.ttlSystem?.batchSize, + callback: this.#ttlExpiredCallback.bind(this), + }, // Run data provider for V3 optimized format - reads from PostgreSQL when no Redis message key exists runDataProvider: { getRunData: async (runId: string) => { @@ -2103,6 +2111,35 @@ export class RunEngine { })); } + /** + * Callback for the TTL system when runs expire. + * Calls ttlSystem.expireRun() for each expired run to update database and emit events. + */ + async #ttlExpiredCallback( + runs: Array<{ queueKey: string; runId: string; orgId: string }> + ): Promise { + // Process expired runs concurrently with limited parallelism + await pMap( + runs, + async (run) => { + try { + await this.ttlSystem.expireRun({ runId: run.runId }); + this.logger.debug("TTL system expired run", { + runId: run.runId, + orgId: run.orgId, + }); + } catch (error) { + this.logger.error("Failed to expire run via TTL system", { + runId: run.runId, + orgId: run.orgId, + error, + }); + } + }, + { concurrency: 10 } + ); + } + /** * Invalidates the billing cache for an organization when their plan changes * Runs in background and handles all errors internally diff --git a/internal-packages/run-engine/src/engine/systems/enqueueSystem.ts b/internal-packages/run-engine/src/engine/systems/enqueueSystem.ts index 395e44727c..4726bdb736 100644 --- a/internal-packages/run-engine/src/engine/systems/enqueueSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/enqueueSystem.ts @@ -4,6 +4,7 @@ import { TaskRun, TaskRunExecutionStatus, } from "@trigger.dev/database"; +import { parseNaturalLanguageDuration } from "@trigger.dev/core/v3/isomorphic"; import { MinimalAuthenticatedEnvironment } from "../../shared/index.js"; import { ExecutionSnapshotSystem } from "./executionSnapshotSystem.js"; import { SystemResources } from "./systems.js"; @@ -81,6 +82,15 @@ export class EnqueueSystem { const timestamp = (run.queueTimestamp ?? run.createdAt).getTime() - run.priorityMs; + // Calculate TTL expiration timestamp if the run has a TTL + let ttlExpiresAt: number | undefined; + if (run.ttl) { + const expireAt = parseNaturalLanguageDuration(run.ttl); + if (expireAt) { + ttlExpiresAt = expireAt.getTime(); + } + } + await this.$.runQueue.enqueueMessage({ env, workerQueue, @@ -95,6 +105,7 @@ export class EnqueueSystem { concurrencyKey: run.concurrencyKey ?? undefined, timestamp, attempt: 0, + ttlExpiresAt, }, }); diff --git a/internal-packages/run-engine/src/engine/types.ts b/internal-packages/run-engine/src/engine/types.ts index ee5176c2fa..a9e188f2a3 100644 --- a/internal-packages/run-engine/src/engine/types.ts +++ b/internal-packages/run-engine/src/engine/types.ts @@ -63,6 +63,17 @@ export type RunEngineOptions = { scanJitterInMs?: number; processMarkedJitterInMs?: number; }; + /** TTL system options for automatic run expiration */ + ttlSystem?: { + /** Number of shards for TTL sorted sets (default: same as queue shards) */ + shardCount?: number; + /** How often to poll each shard for expired runs (ms, default: 1000) */ + pollIntervalMs?: number; + /** Max number of runs to expire per poll per shard (default: 100) */ + batchSize?: number; + /** Whether TTL consumers are disabled (default: false) */ + disabled?: boolean; + }; }; runLock: { redis: RedisOptions; From 854795293013736c6316a7fa019685713dd0597d Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 2 Feb 2026 19:06:42 +0000 Subject: [PATCH 6/6] refactor(run-queue): use runId in sorted set with compact message key for V3 format Changes the V3 message format to use a more reliable architecture: - Sorted set member is now just runId (same as legacy) for reliable ZREM operations - Message key stores compact V3 format: v3:queue|timestamp|attempt|envType|workerQueue - This achieves ~64% storage reduction vs JSON while maintaining reliable queue operations The previous approach of encoding data in sorted set member was fragile because ZREM requires exact byte-for-byte match - if attempt number was wrong during reconstruction, the remove would silently fail. https://claude.ai/code/session_01AyzQp6tbj7th5QRTCYjJR5 --- .../run-engine/src/run-queue/index.ts | 265 +++++++++--------- .../src/run-queue/messageEncoding.ts | 81 +++++- .../run-queue/tests/messageEncoding.test.ts | 116 +++++++- 3 files changed, 332 insertions(+), 130 deletions(-) diff --git a/internal-packages/run-engine/src/run-queue/index.ts b/internal-packages/run-engine/src/run-queue/index.ts index 7dd13d1b75..43a9c92a61 100644 --- a/internal-packages/run-engine/src/run-queue/index.ts +++ b/internal-packages/run-engine/src/run-queue/index.ts @@ -45,12 +45,12 @@ import { RunQueueSelectionStrategy, } from "./types.js"; import { - encodeQueueMember, + encodeMessageKeyValue, + isV3MessageKeyValue, encodeWorkerQueueEntry, decodeWorkerQueueEntry, isEncodedWorkerQueueEntry, reconstructMessageFromWorkerEntry, - LUA_ENCODING_HELPERS, } from "./messageEncoding.js"; import { WorkerQueueResolver } from "./workerQueueResolver.js"; @@ -585,13 +585,16 @@ export class RunQueue { } public async readMessage(orgId: string, messageId: string): Promise { - // First try to read from Redis (legacy V2 format) - const redisMessage = await this.readMessageFromKey(this.keys.messageKey(orgId, messageId)); + // First try to read from Redis (handles both legacy JSON and V3 compact format) + const redisMessage = await this.readMessageFromKey( + this.keys.messageKey(orgId, messageId), + messageId + ); if (redisMessage) { return redisMessage; } - // Fall back to runDataProvider (for V3 format where there's no message key) + // Fall back to runDataProvider (for cases where message key doesn't exist) if (this._runDataProvider) { const runData = await this._runDataProvider.getRunData(messageId); if (runData) { @@ -623,7 +626,7 @@ export class RunQueue { return undefined; } - public async readMessageFromKey(messageKey: string) { + public async readMessageFromKey(messageKey: string, runId?: string) { return this.#trace( "readMessageFromKey", async (span) => { @@ -633,7 +636,10 @@ export class RunQueue { return; } - const [error, message] = parseRawMessage(rawMessage); + const [error, message] = parseRawMessage(rawMessage, { + keys: this.keys, + runId, + }); if (error) { this.logger.error(`[${this.name}] Failed to parse message`, { @@ -1554,17 +1560,19 @@ export class RunQueue { const messageScore = String(message.timestamp); if (this._useOptimizedMessageFormat) { - // V3 optimized format: encode message data in sorted set member, no separate message key - const encodedMember = encodeQueueMember({ - runId: message.runId, - workerQueue: message.workerQueue, + // V3 optimized format: compact message key, runId in sorted set + const messageData = encodeMessageKeyValue({ + queue: message.queue, + timestamp: message.timestamp, attempt: message.attempt, environmentType: message.environmentType, + workerQueue: message.workerQueue, }); this.logger.debug("Calling enqueueMessageV3 (optimized)", { queueKey, - encodedMember, + messageKey, + messageData, queueCurrentConcurrencyKey, envCurrentConcurrencyKey, envQueueKey, @@ -1576,14 +1584,15 @@ export class RunQueue { await this.redis.enqueueMessageV3( masterQueueKey, queueKey, + messageKey, queueCurrentConcurrencyKey, envCurrentConcurrencyKey, queueCurrentDequeuedKey, envCurrentDequeuedKey, envQueueKey, queueName, - encodedMember, - messageId, // For env queue, still use runId for consistency + messageId, + messageData, messageScore ); } else { @@ -1701,42 +1710,20 @@ export class RunQueue { const messageScore = result[i + 1]; const rawMessage = result[i + 2]; - let message: OutputPayload | undefined; - - // Try to detect V3 encoded format (contains \x1e delimiter and doesn't start with '{') - if (rawMessage.includes("\x1e") && !rawMessage.startsWith("{")) { - // V3 format: rawMessage is encoded member "runId|workerQueue|attempt|envType" - const decoded = decodeWorkerQueueEntry( - rawMessage + "\x1e" + messageQueue + "\x1e" + messageScore - ); - if (decoded) { - const descriptor = this.keys.descriptorFromQueue(messageQueue); - message = reconstructMessageFromWorkerEntry(decoded, descriptor); - this.logger.debug("Parsed V3 encoded message", { - messageId, - decoded, - service: this.name, - }); - } else { - this.logger.error(`[${this.name}] Failed to decode V3 message`, { - messageId, - rawMessage, - service: this.name, - }); - continue; - } - } else { - // Legacy format: rawMessage is JSON - const parsedMessage = OutputPayload.safeParse(JSON.parse(rawMessage)); - if (!parsedMessage.success) { - this.logger.error(`[${this.name}] Failed to parse legacy message`, { - messageId, - error: parsedMessage.error, - service: this.name, - }); - continue; - } - message = parsedMessage.data; + // Parse message - handles both JSON (legacy) and V3 compact format + const [error, message] = parseRawMessage(rawMessage, { + keys: this.keys, + runId: messageId, + }); + + if (error) { + this.logger.error(`[${this.name}] Failed to parse dequeued message`, { + messageId, + error, + rawMessage, + service: this.name, + }); + continue; } if (message) { @@ -2018,16 +2005,18 @@ export class RunQueue { const messageScore = retryAt ?? (nextRetryDelay ? Date.now() + nextRetryDelay : Date.now()); if (this._useOptimizedMessageFormat) { - // V3 format: encode message data in sorted set member, no message key - const encodedMember = encodeQueueMember({ - runId: message.runId, - workerQueue: this.#getWorkerQueueFromMessage(message), + // V3 format: compact message key, runId in sorted set + const messageData = encodeMessageKeyValue({ + queue: message.queue, + timestamp: message.timestamp, attempt: message.attempt, environmentType: message.environmentType, + workerQueue: this.#getWorkerQueueFromMessage(message), }); this.logger.debug("Calling nackMessageV3 (optimized)", { - encodedMember, + messageKey, + messageData, messageQueue, masterQueueKey, queueCurrentConcurrencyKey, @@ -2041,6 +2030,7 @@ export class RunQueue { await this.redis.nackMessageV3( //keys masterQueueKey, + messageKey, messageQueue, queueCurrentConcurrencyKey, envCurrentConcurrencyKey, @@ -2048,9 +2038,9 @@ export class RunQueue { envCurrentDequeuedKey, envQueueKey, //args - encodedMember, messageId, messageQueue, + messageData, String(messageScore) ); } else { @@ -2442,7 +2432,13 @@ export class RunQueue { return; } - const [error, message] = parseRawMessage(rawMessage); + // Extract runId from message key path (format: {org:orgId}:message:{runId}) + const runIdFromKey = messageKey.split(":message:").pop(); + + const [error, message] = parseRawMessage(rawMessage, { + keys: this.keys, + runId: runIdFromKey, + }); if (error) { this.logger.error(`[${this.name}] Failed to parse message`, { @@ -2532,29 +2528,31 @@ redis.call('SREM', envCurrentDequeuedKey, messageId) `, }); - // V3 optimized enqueue: no message key, encoded member in sorted set + // V3 optimized enqueue: compact message key format, runId in sorted set this.redis.defineCommand("enqueueMessageV3", { - numberOfKeys: 7, + numberOfKeys: 8, lua: ` local masterQueueKey = KEYS[1] local queueKey = KEYS[2] -local queueCurrentConcurrencyKey = KEYS[3] -local envCurrentConcurrencyKey = KEYS[4] -local queueCurrentDequeuedKey = KEYS[5] -local envCurrentDequeuedKey = KEYS[6] -local envQueueKey = KEYS[7] +local messageKey = KEYS[3] +local queueCurrentConcurrencyKey = KEYS[4] +local envCurrentConcurrencyKey = KEYS[5] +local queueCurrentDequeuedKey = KEYS[6] +local envCurrentDequeuedKey = KEYS[7] +local envQueueKey = KEYS[8] local queueName = ARGV[1] -local encodedMember = ARGV[2] -local messageId = ARGV[3] +local messageId = ARGV[2] +local messageData = ARGV[3] local messageScore = ARGV[4] --- NO message key write - data is encoded in the sorted set member +-- Write the compact V3 message data to the message key +redis.call('SET', messageKey, messageData) --- Add the encoded member to the queue (contains runId|workerQueue|attempt|envType) -redis.call('ZADD', queueKey, messageScore, encodedMember) +-- Add the messageId (runId) to the queue - simple and reliable +redis.call('ZADD', queueKey, messageScore, messageId) --- Add the messageId to the env queue (for counting, using runId for consistency) +-- Add the messageId to the env queue (for counting) redis.call('ZADD', envQueueKey, messageScore, messageId) -- Rebalance the parent queues @@ -2594,23 +2592,6 @@ local defaultEnvConcurrencyBurstFactor = ARGV[4] local keyPrefix = ARGV[5] local maxCount = tonumber(ARGV[6] or '1') --- V3 format detection helper: ASCII Record Separator delimiter -local DELIMITER = "\\x1e" - --- Check if member is V3 encoded format (contains delimiter) -local function isV3Encoded(member) - return string.find(member, DELIMITER, 1, true) ~= nil -end - --- Extract runId from V3 encoded member (first field before delimiter) -local function getRunIdFromV3(member) - local delimPos = string.find(member, DELIMITER, 1, true) - if delimPos then - return string.sub(member, 1, delimPos - 1) - end - return member -end - -- Check current env concurrency against the limit local envCurrentConcurrency = tonumber(redis.call('SCARD', envCurrentConcurrencyKey) or '0') local envConcurrencyLimit = tonumber(redis.call('GET', envConcurrencyLimitKey) or defaultEnvConcurrencyLimit) @@ -2651,33 +2632,24 @@ local results = {} local dequeuedCount = 0 -- Process messages in pairs (member, score) +-- Member is always runId (both legacy and V3 formats use runId in sorted set) for i = 1, #messages, 2 do - local member = messages[i] + local runId = messages[i] local messageScore = tonumber(messages[i + 1]) - local runId - local messagePayload - - if isV3Encoded(member) then - -- V3 format: member contains encoded data (runId|workerQueue|attempt|envType) - runId = getRunIdFromV3(member) - -- For V3, the "payload" is the encoded member itself - TypeScript will reconstruct - messagePayload = member - else - -- Legacy format: member is just runId, fetch from message key - runId = member - local messageKey = messageKeyPrefix .. runId - messagePayload = redis.call('GET', messageKey) - end + + -- Fetch message data from message key + local messageKey = messageKeyPrefix .. runId + local messagePayload = redis.call('GET', messageKey) if messagePayload then - -- Update concurrency using runId (not the full encoded member) - redis.call('ZREM', queueKey, member) + -- Remove from queues and update concurrency + redis.call('ZREM', queueKey, runId) redis.call('ZREM', envQueueKey, runId) redis.call('SADD', queueCurrentConcurrencyKey, runId) redis.call('SADD', envCurrentConcurrencyKey, runId) -- Add to results: [runId, score, payload, ...] - -- For V3, payload is the encoded member; for legacy, it's JSON + -- Payload can be JSON (legacy) or V3 compact format table.insert(results, runId) table.insert(results, messageScore) table.insert(results, messagePayload) @@ -2696,7 +2668,7 @@ else end -- Return results as a flat array: [runId1, messageScore1, messagePayload1, runId2, messageScore2, messagePayload2, ...] --- For V3, messagePayload is the encoded member string; for legacy, it's JSON +-- messagePayload can be JSON (legacy) or V3 compact format - TypeScript handles parsing return results `, }); @@ -2843,36 +2815,37 @@ end `, }); - // V3 optimized nack: no message key, encoded member in sorted set + // V3 optimized nack: compact message key format, runId in sorted set this.redis.defineCommand("nackMessageV3", { - numberOfKeys: 7, + numberOfKeys: 8, lua: ` -- Keys: local masterQueueKey = KEYS[1] -local messageQueueKey = KEYS[2] -local queueCurrentConcurrencyKey = KEYS[3] -local envCurrentConcurrencyKey = KEYS[4] -local queueCurrentDequeuedKey = KEYS[5] -local envCurrentDequeuedKey = KEYS[6] -local envQueueKey = KEYS[7] +local messageKey = KEYS[2] +local messageQueueKey = KEYS[3] +local queueCurrentConcurrencyKey = KEYS[4] +local envCurrentConcurrencyKey = KEYS[5] +local queueCurrentDequeuedKey = KEYS[6] +local envCurrentDequeuedKey = KEYS[7] +local envQueueKey = KEYS[8] -- Args: -local encodedMember = ARGV[1] -local messageId = ARGV[2] -local messageQueueName = ARGV[3] +local messageId = ARGV[1] +local messageQueueName = ARGV[2] +local messageData = ARGV[3] local messageScore = tonumber(ARGV[4]) --- NO message key write - data is encoded in the sorted set member +-- Write the compact V3 message data to the message key +redis.call('SET', messageKey, messageData) --- Update the concurrency keys (use runId/messageId) +-- Update the concurrency keys redis.call('SREM', queueCurrentConcurrencyKey, messageId) redis.call('SREM', envCurrentConcurrencyKey, messageId) redis.call('SREM', queueCurrentDequeuedKey, messageId) redis.call('SREM', envCurrentDequeuedKey, messageId) --- Enqueue the encoded member into the queue -redis.call('ZADD', messageQueueKey, messageScore, encodedMember) --- For env queue, use messageId for consistency with counting +-- Enqueue the messageId (runId) into the queue - simple and reliable +redis.call('ZADD', messageQueueKey, messageScore, messageId) redis.call('ZADD', envQueueKey, messageScore, messageId) -- Rebalance the parent queues @@ -3206,11 +3179,12 @@ declare module "@internal/redis" { callback?: Callback ): Result; - // V3 optimized commands (no message key) + // V3 optimized commands (compact message key format) enqueueMessageV3( //keys masterQueueKey: string, queue: string, + messageKey: string, queueCurrentConcurrencyKey: string, envCurrentConcurrencyKey: string, queueCurrentDequeuedKey: string, @@ -3218,8 +3192,8 @@ declare module "@internal/redis" { envQueueKey: string, //args queueName: string, - encodedMember: string, messageId: string, + messageData: string, messageScore: string, callback?: Callback ): Result; @@ -3227,6 +3201,7 @@ declare module "@internal/redis" { nackMessageV3( // keys masterQueueKey: string, + messageKey: string, messageQueue: string, queueCurrentConcurrencyKey: string, envCurrentConcurrencyKey: string, @@ -3234,9 +3209,9 @@ declare module "@internal/redis" { envCurrentDequeuedKey: string, envQueueKey: string, // args - encodedMember: string, messageId: string, messageQueueName: string, + messageData: string, messageScore: string, callback?: Callback ): Result; @@ -3245,7 +3220,45 @@ declare module "@internal/redis" { type ParseRawMessageResult = [Error | null, OutputPayload | null]; -function parseRawMessage(rawMessage: string): ParseRawMessageResult { +function parseRawMessage( + rawMessage: string, + options?: { + keys?: RunQueueKeyProducer; + runId?: string; + } +): ParseRawMessageResult { + // Check for V3 compact format (starts with "v3:" prefix) + if (isV3MessageKeyValue(rawMessage)) { + const decoded = decodeMessageKeyValue(rawMessage); + if (!decoded) { + return [new Error("Failed to decode V3 message format"), undefined]; + } + + // Extract additional fields from the queue key + if (!options?.keys) { + return [new Error("Keys required to parse V3 message format"), undefined]; + } + const descriptor = options.keys.descriptorFromQueue(decoded.queue); + + const message: OutputPayloadV2 = { + version: "2", + runId: options.runId ?? "", // Filled in by caller + taskIdentifier: descriptor.queue, + orgId: descriptor.orgId, + projectId: descriptor.projectId, + environmentId: descriptor.envId, + environmentType: decoded.environmentType, + queue: decoded.queue, + concurrencyKey: descriptor.concurrencyKey, + timestamp: decoded.timestamp, + attempt: decoded.attempt, + workerQueue: decoded.workerQueue, + }; + + return [null, message]; + } + + // Legacy JSON format const deserializedMessage = safeJsonParse(rawMessage); const message = OutputPayload.safeParse(deserializedMessage); diff --git a/internal-packages/run-engine/src/run-queue/messageEncoding.ts b/internal-packages/run-engine/src/run-queue/messageEncoding.ts index 663ab55c58..064dfc386d 100644 --- a/internal-packages/run-engine/src/run-queue/messageEncoding.ts +++ b/internal-packages/run-engine/src/run-queue/messageEncoding.ts @@ -40,8 +40,26 @@ const CHAR_TO_ENV_TYPE: Record = { }; /** - * Data encoded in sorted set member for v3 format. - * Everything else is derived from the queue key. + * Data encoded in V3 message key value. + * Uses compact pipe-delimited format instead of JSON. + * Fields that can be derived from the queue key are excluded. + */ +export interface EncodedMessageKeyData { + /** Full queue key - needed for queue operations */ + queue: string; + /** Unix timestamp for scoring */ + timestamp: number; + /** Attempt number for retry logic */ + attempt: number; + /** Environment type (single char encoded) */ + environmentType: RuntimeEnvironmentType; + /** Worker queue name for routing */ + workerQueue: string; +} + +/** + * @deprecated V3 no longer encodes in sorted set member. Use runId directly. + * Kept for backwards compatibility during migration. */ export interface EncodedQueueMember { runId: string; @@ -59,8 +77,67 @@ export interface EncodedWorkerQueueEntry extends EncodedQueueMember { timestamp: number; } +// V3 message key prefix to distinguish from legacy JSON +const V3_MESSAGE_PREFIX = "v3:"; + +/** + * Encode data for V3 message key value. + * Format: v3:queue␞timestamp␞attempt␞envTypeChar␞workerQueue + * + * This is ~60-100 bytes vs ~400-600+ bytes for JSON. + */ +export function encodeMessageKeyValue(data: EncodedMessageKeyData): string { + const envChar = ENV_TYPE_TO_CHAR[data.environmentType]; + return ( + V3_MESSAGE_PREFIX + + [data.queue, data.timestamp.toString(), data.attempt.toString(), envChar, data.workerQueue].join( + DELIMITER + ) + ); +} + +/** + * Decode V3 message key value. + * Returns undefined if not in V3 format. + */ +export function decodeMessageKeyValue(value: string): EncodedMessageKeyData | undefined { + if (!value.startsWith(V3_MESSAGE_PREFIX)) { + return undefined; + } + + const content = value.slice(V3_MESSAGE_PREFIX.length); + const parts = content.split(DELIMITER); + + if (parts.length !== 5) { + return undefined; + } + + const [queue, timestampStr, attemptStr, envChar, workerQueue] = parts; + const environmentType = CHAR_TO_ENV_TYPE[envChar]; + + if (!environmentType) { + return undefined; + } + + return { + queue, + timestamp: parseInt(timestampStr, 10), + attempt: parseInt(attemptStr, 10), + environmentType, + workerQueue, + }; +} + +/** + * Check if a message key value is V3 format (starts with v3: prefix). + */ +export function isV3MessageKeyValue(value: string): boolean { + return value.startsWith(V3_MESSAGE_PREFIX); +} + /** * Check if a sorted set member is in v3 encoded format. + * @deprecated V3 no longer encodes in sorted set. Members are just runIds. */ export function isEncodedQueueMember(member: string): boolean { return member.includes(DELIMITER); diff --git a/internal-packages/run-engine/src/run-queue/tests/messageEncoding.test.ts b/internal-packages/run-engine/src/run-queue/tests/messageEncoding.test.ts index e1142048df..e3135af3b8 100644 --- a/internal-packages/run-engine/src/run-queue/tests/messageEncoding.test.ts +++ b/internal-packages/run-engine/src/run-queue/tests/messageEncoding.test.ts @@ -8,6 +8,9 @@ import { isEncodedWorkerQueueEntry, getRunIdFromMember, reconstructMessageFromWorkerEntry, + encodeMessageKeyValue, + decodeMessageKeyValue, + isV3MessageKeyValue, } from "../messageEncoding.js"; describe("messageEncoding", () => { @@ -220,8 +223,116 @@ describe("messageEncoding", () => { }); }); + describe("encodeMessageKeyValue / decodeMessageKeyValue", () => { + test("roundtrips correctly for PRODUCTION", () => { + const original = { + queue: "{org:org123}:proj:proj456:env:env789:queue:my-task", + timestamp: 1706812800000, + attempt: 0, + environmentType: "PRODUCTION" as const, + workerQueue: "env_xyz", + }; + const encoded = encodeMessageKeyValue(original); + const decoded = decodeMessageKeyValue(encoded); + + expect(decoded).toEqual(original); + }); + + test("roundtrips correctly for DEVELOPMENT", () => { + const original = { + queue: "{org:o1}:proj:p1:env:e1:queue:task", + timestamp: 1706812800123, + attempt: 5, + environmentType: "DEVELOPMENT" as const, + workerQueue: "env_dev", + }; + const encoded = encodeMessageKeyValue(original); + const decoded = decodeMessageKeyValue(encoded); + + expect(decoded).toEqual(original); + }); + + test("encoded value starts with v3: prefix", () => { + const encoded = encodeMessageKeyValue({ + queue: "{org:o1}:proj:p1:env:e1:queue:task", + timestamp: 1706812800000, + attempt: 0, + environmentType: "PRODUCTION", + workerQueue: "env_xyz", + }); + + expect(encoded.startsWith("v3:")).toBe(true); + }); + + test("decodeMessageKeyValue returns undefined for JSON", () => { + expect(decodeMessageKeyValue('{"version":"2","runId":"run_123"}')).toBeUndefined(); + }); + + test("decodeMessageKeyValue returns undefined for malformed v3", () => { + expect(decodeMessageKeyValue("v3:only_two_parts")).toBeUndefined(); + }); + }); + + describe("isV3MessageKeyValue", () => { + test("returns true for v3 encoded message key value", () => { + const encoded = encodeMessageKeyValue({ + queue: "{org:o1}:proj:p1:env:e1:queue:task", + timestamp: 1706812800000, + attempt: 0, + environmentType: "PRODUCTION", + workerQueue: "env_xyz", + }); + expect(isV3MessageKeyValue(encoded)).toBe(true); + }); + + test("returns false for JSON format", () => { + expect(isV3MessageKeyValue('{"version":"2","runId":"run_123"}')).toBe(false); + }); + + test("returns false for legacy v2 format", () => { + expect(isV3MessageKeyValue('{"version":"2"}')).toBe(false); + }); + }); + describe("encoded size comparison", () => { - test("v3 format is significantly smaller than full JSON", () => { + test("v3 message key format is significantly smaller than full JSON", () => { + const fullPayload = JSON.stringify({ + version: "2", + runId: "run_clxyz123abc456def789", + taskIdentifier: "my-background-task", + orgId: "org_clxyz123abc456def789", + projectId: "proj_clxyz123abc456def789", + environmentId: "env_clxyz123abc456def789", + environmentType: "PRODUCTION", + queue: + "{org:org_clxyz123abc456def789}:proj:proj_clxyz123abc456def789:env:env_clxyz123abc456def789:queue:my-background-task", + concurrencyKey: undefined, + timestamp: 1706812800000, + attempt: 0, + workerQueue: "env_clxyz123abc456def789", + }); + + const v3MessageKey = encodeMessageKeyValue({ + queue: + "{org:org_clxyz123abc456def789}:proj:proj_clxyz123abc456def789:env:env_clxyz123abc456def789:queue:my-background-task", + timestamp: 1706812800000, + attempt: 0, + environmentType: "PRODUCTION", + workerQueue: "env_clxyz123abc456def789", + }); + + // Full JSON is typically 400-600 bytes + // V3 message key should be ~200 bytes (includes the full queue key) + expect(v3MessageKey.length).toBeLessThan(fullPayload.length * 0.6); + + console.log(`Full JSON size: ${fullPayload.length} bytes`); + console.log(`V3 message key size: ${v3MessageKey.length} bytes`); + console.log( + `Reduction: ${((1 - v3MessageKey.length / fullPayload.length) * 100).toFixed(1)}%` + ); + }); + + test("v3 queue member format is significantly smaller than full JSON", () => { const fullPayload = JSON.stringify({ version: "2", runId: "run_clxyz123abc456def789", @@ -230,7 +341,8 @@ describe("messageEncoding", () => { projectId: "proj_clxyz123abc456def789", environmentId: "env_clxyz123abc456def789", environmentType: "PRODUCTION", - queue: "{org:org_clxyz123abc456def789}:proj:proj_clxyz123abc456def789:env:env_clxyz123abc456def789:queue:my-background-task", + queue: + "{org:org_clxyz123abc456def789}:proj:proj_clxyz123abc456def789:env:env_clxyz123abc456def789:queue:my-background-task", concurrencyKey: undefined, timestamp: 1706812800000, attempt: 0,