MCPcopy Index your code
hub / github.com/simstudioai/sim / runBackfill

Function runBackfill

apps/sim/scripts/backfill-table-order-keys.ts:39–121  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

37const WRITE_CHUNK_SIZE = 5000
38
39export async function runBackfill(): Promise<void> {
40 const dryRun = process.argv.includes('--dry-run')
41 const connectionString = process.env.DATABASE_URL ?? process.env.POSTGRES_URL
42 if (!connectionString) {
43 console.error('Missing DATABASE_URL or POSTGRES_URL')
44 process.exit(1)
45 }
46
47 const client = postgres(connectionString, {
48 prepare: false,
49 idle_timeout: 20,
50 connect_timeout: 30,
51 max: 5,
52 onnotice: () => {},
53 })
54 const db = drizzle(client)
55
56 const stats = { tables: 0, tablesKeyed: 0, rowsKeyed: 0, failed: 0 }
57
58 try {
59 // Tables that still have at least one un-keyed row.
60 const pending = await db
61 .selectDistinct({ tableId: userTableRows.tableId })
62 .from(userTableRows)
63 .where(isNull(userTableRows.orderKey))
64
65 console.log(
66 `Backfill starting — ${pending.length} table(s) with NULL order_key${dryRun ? ' [DRY RUN]' : ''}`
67 )
68
69 for (const { tableId } of pending) {
70 stats.tables += 1
71 try {
72 const keyed = await db.transaction(async (trx) => {
73 // Serialize with concurrent inserts on this table (same lock the app uses).
74 await trx.execute(
75 sql`SELECT pg_advisory_xact_lock(hashtextextended(${`user_table_rows_pos:${tableId}`}, 0))`
76 )
77 const rows = await trx
78 .select({ id: userTableRows.id })
79 .from(userTableRows)
80 .where(eq(userTableRows.tableId, tableId))
81 .orderBy(asc(userTableRows.position), asc(userTableRows.id))
82
83 if (rows.length === 0) return 0
84 const keys = nKeysBetween(null, null, rows.length)
85 if (dryRun) return rows.length
86
87 // Chunked UPDATE … FROM (VALUES …) mapping id → key (see WRITE_CHUNK_SIZE).
88 for (let start = 0; start < rows.length; start += WRITE_CHUNK_SIZE) {
89 const chunk = rows.slice(start, start + WRITE_CHUNK_SIZE)
90 const values = sql.join(
91 chunk.map((r, i) => sql`(${r.id}, ${keys[start + i]})`),
92 sql`, `
93 )
94 await trx.execute(sql`
95 UPDATE user_table_rows AS t
96 SET order_key = v.order_key

Callers 1

Calls 7

nKeysBetweenFunction · 0.90
getErrorMessageFunction · 0.90
errorMethod · 0.80
logMethod · 0.80
joinMethod · 0.80
executeMethod · 0.65
eqFunction · 0.50

Tested by

no test coverage detected