(payload: TableBackfillPayload)
| 188 | * passes skip already-filled cells). |
| 189 | */ |
| 190 | export async function runTableBackfill(payload: TableBackfillPayload): Promise<void> { |
| 191 | const { jobId, tableId, groupId, outputs, overwrite, actorUserId } = payload |
| 192 | const requestId = generateId().slice(0, 8) |
| 193 | |
| 194 | try { |
| 195 | const table = await getTableById(tableId, { includeArchived: true }) |
| 196 | if (!table) throw new Error(`Backfill target table ${tableId} not found`) |
| 197 | |
| 198 | let processed = 0 |
| 199 | let updated = 0 |
| 200 | let afterRowId: string | undefined |
| 201 | |
| 202 | while (true) { |
| 203 | const owns = await updateJobProgress(tableId, processed, jobId) |
| 204 | if (!owns) throw new JobSupersededError() |
| 205 | |
| 206 | const execs = await selectCompletedExecPage(tableId, groupId, afterRowId, BACKFILL_PAGE_SIZE) |
| 207 | if (execs.length === 0) break |
| 208 | afterRowId = execs[execs.length - 1].rowId |
| 209 | |
| 210 | updated += await processBackfillPage({ |
| 211 | table, |
| 212 | outputs, |
| 213 | overwrite, |
| 214 | execs, |
| 215 | requestId, |
| 216 | actorUserId, |
| 217 | }) |
| 218 | processed += execs.length |
| 219 | } |
| 220 | |
| 221 | await updateJobProgress(tableId, processed, jobId) |
| 222 | const becameReady = await markJobReady(tableId, jobId) |
| 223 | if (becameReady) { |
| 224 | void appendTableEvent({ |
| 225 | kind: 'job', |
| 226 | type: 'backfill', |
| 227 | tableId, |
| 228 | jobId, |
| 229 | status: 'ready', |
| 230 | progress: updated, |
| 231 | }) |
| 232 | logger.info(`[${requestId}] Backfill complete`, { tableId, groupId, processed, updated }) |
| 233 | } else { |
| 234 | logger.info(`[${requestId}] Backfill finished but no longer owns the run`, { tableId, jobId }) |
| 235 | } |
| 236 | } catch (err) { |
| 237 | if (err instanceof JobSupersededError) { |
| 238 | logger.info(`[${requestId}] Backfill superseded/canceled; stopping`, { tableId, jobId }) |
| 239 | } else { |
| 240 | const message = getErrorMessage(err, 'Backfill failed') |
| 241 | logger.error(`[${requestId}] Backfill failed for table ${tableId}:`, err) |
| 242 | await markJobFailed(tableId, jobId, message).catch(() => {}) |
| 243 | void appendTableEvent({ |
| 244 | kind: 'job', |
| 245 | type: 'backfill', |
| 246 | tableId, |
| 247 | jobId, |
no test coverage detected