(
trx: DbOrTx,
tableId: string,
rowId: string,
patch: Record<string, RowExecutionMetadata | null> | undefined,
guard?: { groupId: string; executionId: string }
)
| 223 | * (callers signal failure to the cell-task path). Returns `'wrote'` otherwise. |
| 224 | */ |
| 225 | export async function writeExecutionsPatch( |
| 226 | trx: DbOrTx, |
| 227 | tableId: string, |
| 228 | rowId: string, |
| 229 | patch: Record<string, RowExecutionMetadata | null> | undefined, |
| 230 | guard?: { groupId: string; executionId: string } |
| 231 | ): Promise<'wrote' | 'guard-rejected'> { |
| 232 | if (!patch) return 'wrote' |
| 233 | const entries = Object.entries(patch) |
| 234 | if (entries.length === 0) return 'wrote' |
| 235 | |
| 236 | for (const [gid, value] of entries) { |
| 237 | if (value === null) { |
| 238 | await trx |
| 239 | .delete(tableRowExecutions) |
| 240 | .where(and(eq(tableRowExecutions.rowId, rowId), eq(tableRowExecutions.groupId, gid)) as SQL) |
| 241 | continue |
| 242 | } |
| 243 | const insertValues = { |
| 244 | tableId, |
| 245 | rowId, |
| 246 | groupId: gid, |
| 247 | status: value.status, |
| 248 | executionId: value.executionId, |
| 249 | jobId: value.jobId, |
| 250 | workflowId: value.workflowId, |
| 251 | error: value.error, |
| 252 | runningBlockIds: value.runningBlockIds ?? [], |
| 253 | blockErrors: value.blockErrors ?? {}, |
| 254 | cancelledAt: value.cancelledAt ? new Date(value.cancelledAt) : null, |
| 255 | enrichmentDetails: value.enrichmentDetails ?? null, |
| 256 | updatedAt: new Date(), |
| 257 | } as const |
| 258 | |
| 259 | const isGuarded = guard && guard.groupId === gid |
| 260 | if (isGuarded) { |
| 261 | // Gate by guard semantics. The original JSONB guard had two AND'd |
| 262 | // clauses; we collapse them onto the upsert's WHERE so a non-matching |
| 263 | // existing row leaves the table untouched and we observe 0 affected. |
| 264 | const guardExecutionId = guard.executionId |
| 265 | const updated = await trx |
| 266 | .insert(tableRowExecutions) |
| 267 | .values(insertValues) |
| 268 | .onConflictDoUpdate({ |
| 269 | target: [tableRowExecutions.rowId, tableRowExecutions.groupId], |
| 270 | set: { |
| 271 | status: insertValues.status, |
| 272 | executionId: insertValues.executionId, |
| 273 | jobId: insertValues.jobId, |
| 274 | workflowId: insertValues.workflowId, |
| 275 | error: insertValues.error, |
| 276 | runningBlockIds: insertValues.runningBlockIds, |
| 277 | blockErrors: insertValues.blockErrors, |
| 278 | cancelledAt: insertValues.cancelledAt, |
| 279 | // Sticky: preserve a prior cascade breakdown when this write omits |
| 280 | // it (e.g. the running pickup stamp) so only an explicit detail |
| 281 | // overwrites it. Re-runs delete the row first, so this never serves |
| 282 | // stale detail across runs. |
no test coverage detected