( params: RecordCumulativeUsageParams )
| 449 | * process dies between legs — each leg's cost is durably recorded as it lands. |
| 450 | */ |
| 451 | export async function recordCumulativeUsage( |
| 452 | params: RecordCumulativeUsageParams |
| 453 | ): Promise<RecordCumulativeUsageResult> { |
| 454 | const { userId, workspaceId, source, model, cost, eventKey, metadata } = params |
| 455 | |
| 456 | // Resolved before the locked transaction on purpose: resolving inside it |
| 457 | // ran the subscription lookups on the global pool while this tx already |
| 458 | // held a pooled connection plus the advisory lock, so under load N |
| 459 | // first-flush transactions each pinned a connection while waiting for one |
| 460 | // more — starving the pool and queueing every same-key flush (and the Go |
| 461 | // side's retries) behind the stall. |
| 462 | const billingContext = await resolveBillingContext(userId) |
| 463 | |
| 464 | return db.transaction(async (tx) => { |
| 465 | // Serialize all flushes for this request (lock auto-releases at tx end), |
| 466 | // with a bounded wait so a pathological holder fails this flush fast and |
| 467 | // lets the caller retry instead of hanging the connection. |
| 468 | await tx.execute( |
| 469 | sql`select set_config('lock_timeout', ${`${CUMULATIVE_FLUSH_LOCK_TIMEOUT_MS}ms`}, true)` |
| 470 | ) |
| 471 | await tx.execute(sql`select pg_advisory_xact_lock(hashtextextended(${eventKey}, 0))`) |
| 472 | |
| 473 | const [existing] = await tx |
| 474 | .select({ id: usageLog.id, cost: usageLog.cost }) |
| 475 | .from(usageLog) |
| 476 | .where(eq(usageLog.eventKey, eventKey)) |
| 477 | .limit(1) |
| 478 | |
| 479 | const recorded = existing ? Number.parseFloat(existing.cost) : 0 |
| 480 | const { shouldBill, delta, newTotal } = resolveCumulativeTopUp(recorded, cost) |
| 481 | |
| 482 | if (!shouldBill) { |
| 483 | return { billed: false, delta: 0, total: recorded } |
| 484 | } |
| 485 | |
| 486 | if (existing) { |
| 487 | // Top up the single row to the new (higher) cumulative; the |
| 488 | // period total is SUM(usage_log.cost), so this lifts it by the delta. |
| 489 | await tx |
| 490 | .update(usageLog) |
| 491 | .set({ cost: newTotal.toString(), metadata: metadata ?? null }) |
| 492 | .where(eq(usageLog.id, existing.id)) |
| 493 | } else { |
| 494 | // First flush for this request: insert the canonical row with the |
| 495 | // pre-resolved billing context. Runs in the same tx + advisory lock. |
| 496 | await recordUsage({ |
| 497 | userId, |
| 498 | workspaceId, |
| 499 | tx, |
| 500 | billingEntity: billingContext.billingEntity, |
| 501 | billingPeriod: billingContext.billingPeriod, |
| 502 | entries: [ |
| 503 | { |
| 504 | category: 'model', |
| 505 | source, |
| 506 | description: model, |
| 507 | cost: newTotal, |
| 508 | eventKey, |
no test coverage detected