(payload: TableDeletePayload)
| 58 | * newer job took the table) returns quietly. |
| 59 | */ |
| 60 | export async function runTableDelete(payload: TableDeletePayload): Promise<void> { |
| 61 | const { jobId, tableId, workspaceId, filter, excludeRowIds, cutoff, maxRows } = payload |
| 62 | const requestId = generateId().slice(0, 8) |
| 63 | const budget = maxRows ?? Number.POSITIVE_INFINITY |
| 64 | |
| 65 | try { |
| 66 | const table = await getTableById(tableId, { includeArchived: true }) |
| 67 | if (!table) throw new Error(`Delete target table ${tableId} not found`) |
| 68 | |
| 69 | const filterClause = filter |
| 70 | ? buildFilterClause(filter, USER_TABLE_ROWS_SQL_NAME, table.schema.columns) |
| 71 | : undefined |
| 72 | const excluded = new Set(excludeRowIds ?? []) |
| 73 | |
| 74 | // Resume the persisted count: a retried attempt's earlier batches are already committed, |
| 75 | // so starting at zero would overwrite cumulative progress with this attempt's smaller |
| 76 | // number. Doubles as the initial ownership gate. |
| 77 | const resumed = await getJobProgress(tableId, jobId) |
| 78 | if (resumed === null) throw new JobSupersededError() |
| 79 | |
| 80 | let processed = resumed |
| 81 | let lastReported = resumed |
| 82 | let afterId: string | undefined |
| 83 | |
| 84 | while (processed < budget) { |
| 85 | // Ownership gate before every page: once this run loses the table (cancel/supersede), |
| 86 | // updateJobProgress returns false and we stop before deleting further. |
| 87 | const owns = await updateJobProgress(tableId, processed, jobId) |
| 88 | if (!owns) throw new JobSupersededError() |
| 89 | |
| 90 | const page = await selectRowIdPage({ |
| 91 | tableId, |
| 92 | workspaceId, |
| 93 | cutoff, |
| 94 | filterClause, |
| 95 | afterId, |
| 96 | limit: Math.min(TABLE_LIMITS.DELETE_PAGE_SIZE, budget - processed), |
| 97 | }) |
| 98 | if (page.length === 0) break |
| 99 | // Advance the keyset cursor past the whole page — excluded ids are skipped (not deleted), |
| 100 | // so the cursor must move even when nothing in the page is deletable. |
| 101 | afterId = page[page.length - 1] |
| 102 | |
| 103 | const toDelete = excluded.size > 0 ? page.filter((id) => !excluded.has(id)) : page |
| 104 | if (toDelete.length > 0) { |
| 105 | processed += await deletePageByIds(tableId, workspaceId, toDelete) |
| 106 | } |
| 107 | |
| 108 | if ( |
| 109 | processed - lastReported >= PROGRESS_INTERVAL_ROWS || |
| 110 | (lastReported === 0 && processed > 0) |
| 111 | ) { |
| 112 | lastReported = processed |
| 113 | void appendTableEvent({ |
| 114 | kind: 'job', |
| 115 | type: 'delete', |
| 116 | tableId, |
| 117 | jobId, |
no test coverage detected