Externalize inline heavy execution_data into the large-value store.
( maxBatches: number )
| 69 | |
| 70 | /** Externalize inline heavy execution_data into the large-value store. */ |
| 71 | async function backfillTraceStorage( |
| 72 | maxBatches: number |
| 73 | ): Promise<{ migrated: number; failed: number }> { |
| 74 | let migrated = 0 |
| 75 | let failed = 0 |
| 76 | // Keyset cursor by id: every row is visited at most once per run, so rows that |
| 77 | // can't be externalized (storage error, oversized) aren't re-selected into an |
| 78 | // infinite loop. A fresh re-run (cursor reset) retries any that failed. |
| 79 | let lastId = '' |
| 80 | |
| 81 | for (let batch = 0; batch < maxBatches; batch++) { |
| 82 | const rows = await db |
| 83 | .select({ |
| 84 | id: workflowExecutionLogs.id, |
| 85 | workspaceId: workflowExecutionLogs.workspaceId, |
| 86 | workflowId: workflowExecutionLogs.workflowId, |
| 87 | executionId: workflowExecutionLogs.executionId, |
| 88 | executionData: workflowExecutionLogs.executionData, |
| 89 | }) |
| 90 | .from(workflowExecutionLogs) |
| 91 | .where( |
| 92 | and( |
| 93 | sql`${workflowExecutionLogs.endedAt} IS NOT NULL`, |
| 94 | // Skip deleted-workflow rows: externalization requires a workflowId. |
| 95 | sql`${workflowExecutionLogs.workflowId} IS NOT NULL`, |
| 96 | sql`${workflowExecutionLogs.executionData} ? 'traceSpans'`, |
| 97 | sql`NOT (${workflowExecutionLogs.executionData} ? ${TRACE_STORE_REF_KEY})`, |
| 98 | lastId ? gt(workflowExecutionLogs.id, lastId) : undefined |
| 99 | ) |
| 100 | ) |
| 101 | .orderBy(asc(workflowExecutionLogs.id)) |
| 102 | .limit(TRACE_BATCH_SIZE) |
| 103 | |
| 104 | if (rows.length === 0) break |
| 105 | |
| 106 | for (const row of rows) { |
| 107 | try { |
| 108 | const executionData = (row.executionData ?? {}) as Record<string, unknown> |
| 109 | // Derive the inline markers legacy rows lack so externalizeExecutionData |
| 110 | // carries them onto the slim row (they survive object expiry). |
| 111 | const traceSpanCount = countTraceSpans(executionData.traceSpans) |
| 112 | executionData.hasTraceSpans = traceSpanCount > 0 |
| 113 | executionData.traceSpanCount = traceSpanCount |
| 114 | stripSpanCosts(executionData.traceSpans) |
| 115 | // workspace_files.user_id (NOT NULL) needs the execution owner; legacy |
| 116 | // rows carry it under executionData.environment.userId. Rows without an |
| 117 | // owner can't be externalized — count them as failed and skip. |
| 118 | const environment = executionData.environment as { userId?: string } | undefined |
| 119 | const ownerUserId = environment?.userId |
| 120 | if (!ownerUserId) { |
| 121 | failed++ |
| 122 | continue |
| 123 | } |
| 124 | const slim = await externalizeExecutionData(executionData, { |
| 125 | workspaceId: row.workspaceId, |
| 126 | workflowId: row.workflowId, |
| 127 | executionId: row.executionId, |
| 128 | userId: ownerUserId, |
no test coverage detected