MCPcopy Index your code
hub / github.com/simstudioai/sim / recordCumulativeUsage

Function recordCumulativeUsage

apps/sim/lib/billing/core/usage-log.ts:451–518  ·  view source on GitHub ↗
(
  params: RecordCumulativeUsageParams
)

Source from the content-addressed store, hash-verified

449 * process dies between legs — each leg's cost is durably recorded as it lands.
450 */
451export async function recordCumulativeUsage(
452 params: RecordCumulativeUsageParams
453): Promise<RecordCumulativeUsageResult> {
454 const { userId, workspaceId, source, model, cost, eventKey, metadata } = params
455
456 // Resolved before the locked transaction on purpose: resolving inside it
457 // ran the subscription lookups on the global pool while this tx already
458 // held a pooled connection plus the advisory lock, so under load N
459 // first-flush transactions each pinned a connection while waiting for one
460 // more — starving the pool and queueing every same-key flush (and the Go
461 // side's retries) behind the stall.
462 const billingContext = await resolveBillingContext(userId)
463
464 return db.transaction(async (tx) => {
465 // Serialize all flushes for this request (lock auto-releases at tx end),
466 // with a bounded wait so a pathological holder fails this flush fast and
467 // lets the caller retry instead of hanging the connection.
468 await tx.execute(
469 sql`select set_config('lock_timeout', ${`${CUMULATIVE_FLUSH_LOCK_TIMEOUT_MS}ms`}, true)`
470 )
471 await tx.execute(sql`select pg_advisory_xact_lock(hashtextextended(${eventKey}, 0))`)
472
473 const [existing] = await tx
474 .select({ id: usageLog.id, cost: usageLog.cost })
475 .from(usageLog)
476 .where(eq(usageLog.eventKey, eventKey))
477 .limit(1)
478
479 const recorded = existing ? Number.parseFloat(existing.cost) : 0
480 const { shouldBill, delta, newTotal } = resolveCumulativeTopUp(recorded, cost)
481
482 if (!shouldBill) {
483 return { billed: false, delta: 0, total: recorded }
484 }
485
486 if (existing) {
487 // Top up the single row to the new (higher) cumulative; the
488 // period total is SUM(usage_log.cost), so this lifts it by the delta.
489 await tx
490 .update(usageLog)
491 .set({ cost: newTotal.toString(), metadata: metadata ?? null })
492 .where(eq(usageLog.id, existing.id))
493 } else {
494 // First flush for this request: insert the canonical row with the
495 // pre-resolved billing context. Runs in the same tx + advisory lock.
496 await recordUsage({
497 userId,
498 workspaceId,
499 tx,
500 billingEntity: billingContext.billingEntity,
501 billingPeriod: billingContext.billingPeriod,
502 entries: [
503 {
504 category: 'model',
505 source,
506 description: model,
507 cost: newTotal,
508 eventKey,

Callers 2

usage-log.test.tsFile · 0.90
updateCostInnerFunction · 0.90

Calls 7

resolveBillingContextFunction · 0.85
resolveCumulativeTopUpFunction · 0.85
recordUsageFunction · 0.85
executeMethod · 0.65
setMethod · 0.65
eqFunction · 0.50
toStringMethod · 0.45

Tested by

no test coverage detected