(opts: {
table: TableDefinition
groupId: string
outputs: WorkflowGroupOutput[]
overwrite: boolean
requestId: string
actorUserId?: string | null
})
| 262 | * change" posture (the data stays backfillable). |
| 263 | */ |
| 264 | export async function maybeBackfillGroupOutputs(opts: { |
| 265 | table: TableDefinition |
| 266 | groupId: string |
| 267 | outputs: WorkflowGroupOutput[] |
| 268 | overwrite: boolean |
| 269 | requestId: string |
| 270 | actorUserId?: string | null |
| 271 | }): Promise<void> { |
| 272 | const { table, groupId, outputs, overwrite, requestId, actorUserId } = opts |
| 273 | if (outputs.length === 0) return |
| 274 | |
| 275 | const [{ count: completedCount }] = await db |
| 276 | .select({ count: count() }) |
| 277 | .from(tableRowExecutions) |
| 278 | .where( |
| 279 | and( |
| 280 | eq(tableRowExecutions.tableId, table.id), |
| 281 | eq(tableRowExecutions.groupId, groupId), |
| 282 | eq(tableRowExecutions.status, 'completed') |
| 283 | ) |
| 284 | ) |
| 285 | const total = Number(completedCount) |
| 286 | if (total === 0) return |
| 287 | |
| 288 | if (total <= BACKFILL_ASYNC_THRESHOLD_ROWS) { |
| 289 | // Inline: page without job machinery so memory stays bounded but the caller can await |
| 290 | // full consistency. |
| 291 | let afterRowId: string | undefined |
| 292 | while (true) { |
| 293 | const execs = await selectCompletedExecPage(table.id, groupId, afterRowId, BACKFILL_PAGE_SIZE) |
| 294 | if (execs.length === 0) break |
| 295 | afterRowId = execs[execs.length - 1].rowId |
| 296 | await processBackfillPage({ table, outputs, overwrite, execs, requestId, actorUserId }) |
| 297 | } |
| 298 | return |
| 299 | } |
| 300 | |
| 301 | const jobId = generateId() |
| 302 | const jobPayload: TableBackfillJobPayload = { groupId, outputs, overwrite } |
| 303 | const claimed = await markTableJobRunning(table.id, jobId, 'backfill', jobPayload) |
| 304 | if (!claimed) { |
| 305 | logger.warn( |
| 306 | `[${requestId}] Skipping backfill for table ${table.id} group ${groupId}: another job is running` |
| 307 | ) |
| 308 | return |
| 309 | } |
| 310 | |
| 311 | const payload: TableBackfillPayload = { |
| 312 | jobId, |
| 313 | tableId: table.id, |
| 314 | workspaceId: table.workspaceId, |
| 315 | groupId, |
| 316 | outputs, |
| 317 | overwrite, |
| 318 | actorUserId, |
| 319 | } |
| 320 | if (isTriggerDevEnabled) { |
| 321 | try { |
no test coverage detected