| 16 | const logger = createLogger('SnapshotService') |
| 17 | |
| 18 | export class SnapshotService implements ISnapshotService { |
| 19 | async createSnapshot( |
| 20 | workflowId: string, |
| 21 | state: WorkflowState |
| 22 | ): Promise<WorkflowExecutionSnapshot> { |
| 23 | const result = await this.createSnapshotWithDeduplication(workflowId, state) |
| 24 | return result.snapshot |
| 25 | } |
| 26 | |
| 27 | async createSnapshotWithDeduplication( |
| 28 | workflowId: string, |
| 29 | state: WorkflowState |
| 30 | ): Promise<SnapshotCreationResult> { |
| 31 | const stateHash = this.computeStateHash(state) |
| 32 | |
| 33 | const snapshotData: WorkflowExecutionSnapshotInsert = { |
| 34 | id: generateId(), |
| 35 | workflowId, |
| 36 | stateHash, |
| 37 | stateData: state, |
| 38 | } |
| 39 | |
| 40 | /** |
| 41 | * Insert the snapshot, or — when an identical (workflowId, stateHash) row |
| 42 | * already exists — return it without rewriting the large stateData jsonb. |
| 43 | * |
| 44 | * The hash is a sha256 of the normalized state, so an existing row's stateData |
| 45 | * is byte-identical; there is nothing to update. The previous implementation |
| 46 | * SET state_data on conflict, which rewrote the full (tens-of-KB) jsonb every |
| 47 | * run. We keep a single atomic upsert — so RETURNING always yields the row and |
| 48 | * there is no race with snapshot cleanup (unlike DO NOTHING + a follow-up |
| 49 | * select) — but SET only the small state_hash column to itself. Under Postgres |
| 50 | * MVCC the unchanged, TOASTed stateData is not rewritten: its existing |
| 51 | * out-of-line storage is reused, so the per-execution write drops from the |
| 52 | * full blob to a tiny heap tuple. |
| 53 | */ |
| 54 | const [upsertedSnapshot] = await db |
| 55 | .insert(workflowExecutionSnapshots) |
| 56 | .values(snapshotData) |
| 57 | .onConflictDoUpdate({ |
| 58 | target: [workflowExecutionSnapshots.workflowId, workflowExecutionSnapshots.stateHash], |
| 59 | set: { |
| 60 | stateHash: sql`excluded.state_hash`, |
| 61 | }, |
| 62 | }) |
| 63 | .returning() |
| 64 | |
| 65 | const isNew = upsertedSnapshot.id === snapshotData.id |
| 66 | |
| 67 | logger.info( |
| 68 | isNew |
| 69 | ? `Created new snapshot for workflow ${workflowId} (hash: ${stateHash.slice(0, 12)}..., blocks: ${Object.keys(state.blocks || {}).length})` |
| 70 | : `Reusing existing snapshot for workflow ${workflowId} (hash: ${stateHash.slice(0, 12)}...)` |
| 71 | ) |
| 72 | |
| 73 | return { |
| 74 | snapshot: { |
| 75 | ...upsertedSnapshot, |
nothing calls this directly
no outgoing calls
no test coverage detected