()
| 37 | const WRITE_CHUNK_SIZE = 5000 |
| 38 | |
| 39 | export async function runBackfill(): Promise<void> { |
| 40 | const dryRun = process.argv.includes('--dry-run') |
| 41 | const connectionString = process.env.DATABASE_URL ?? process.env.POSTGRES_URL |
| 42 | if (!connectionString) { |
| 43 | console.error('Missing DATABASE_URL or POSTGRES_URL') |
| 44 | process.exit(1) |
| 45 | } |
| 46 | |
| 47 | const client = postgres(connectionString, { |
| 48 | prepare: false, |
| 49 | idle_timeout: 20, |
| 50 | connect_timeout: 30, |
| 51 | max: 5, |
| 52 | onnotice: () => {}, |
| 53 | }) |
| 54 | const db = drizzle(client) |
| 55 | |
| 56 | const stats = { tables: 0, tablesKeyed: 0, rowsKeyed: 0, failed: 0 } |
| 57 | |
| 58 | try { |
| 59 | // Tables that still have at least one un-keyed row. |
| 60 | const pending = await db |
| 61 | .selectDistinct({ tableId: userTableRows.tableId }) |
| 62 | .from(userTableRows) |
| 63 | .where(isNull(userTableRows.orderKey)) |
| 64 | |
| 65 | console.log( |
| 66 | `Backfill starting — ${pending.length} table(s) with NULL order_key${dryRun ? ' [DRY RUN]' : ''}` |
| 67 | ) |
| 68 | |
| 69 | for (const { tableId } of pending) { |
| 70 | stats.tables += 1 |
| 71 | try { |
| 72 | const keyed = await db.transaction(async (trx) => { |
| 73 | // Serialize with concurrent inserts on this table (same lock the app uses). |
| 74 | await trx.execute( |
| 75 | sql`SELECT pg_advisory_xact_lock(hashtextextended(${`user_table_rows_pos:${tableId}`}, 0))` |
| 76 | ) |
| 77 | const rows = await trx |
| 78 | .select({ id: userTableRows.id }) |
| 79 | .from(userTableRows) |
| 80 | .where(eq(userTableRows.tableId, tableId)) |
| 81 | .orderBy(asc(userTableRows.position), asc(userTableRows.id)) |
| 82 | |
| 83 | if (rows.length === 0) return 0 |
| 84 | const keys = nKeysBetween(null, null, rows.length) |
| 85 | if (dryRun) return rows.length |
| 86 | |
| 87 | // Chunked UPDATE … FROM (VALUES …) mapping id → key (see WRITE_CHUNK_SIZE). |
| 88 | for (let start = 0; start < rows.length; start += WRITE_CHUNK_SIZE) { |
| 89 | const chunk = rows.slice(start, start + WRITE_CHUNK_SIZE) |
| 90 | const values = sql.join( |
| 91 | chunk.map((r, i) => sql`(${r.id}, ${keys[start + i]})`), |
| 92 | sql`, ` |
| 93 | ) |
| 94 | await trx.execute(sql` |
| 95 | UPDATE user_table_rows AS t |
| 96 | SET order_key = v.order_key |
no test coverage detected