* Drains audit events scoped to the organization: rows from any of the org's * workspaces, plus org-level rows (`workspace_id IS NULL`) where * `metadata->>'organizationId'` matches. Audit-log writers consistently set * `metadata.organizationId` for org-scoped actions even though the table has *
(input: SourcePageInput)
| 21 | * no dedicated FK column. |
| 22 | */ |
| 23 | async function* pages(input: SourcePageInput): AsyncIterable<AuditLogRow[]> { |
| 24 | const workspaceIds = await getOrganizationWorkspaceIds(input.organizationId) |
| 25 | |
| 26 | const orgScopedClause = and( |
| 27 | isNull(auditLog.workspaceId), |
| 28 | sql`${auditLog.metadata}->>'organizationId' = ${input.organizationId}` |
| 29 | ) |
| 30 | const scopeClause = |
| 31 | workspaceIds.length === 0 |
| 32 | ? orgScopedClause |
| 33 | : or(inArray(auditLog.workspaceId, workspaceIds), orgScopedClause) |
| 34 | |
| 35 | let cursor = decodeTimeCursor(input.cursor) |
| 36 | while (!input.signal.aborted) { |
| 37 | const cursorClause = timeCursorPredicate(auditLog.createdAt, auditLog.id, cursor) |
| 38 | |
| 39 | const rows = await dbReplica |
| 40 | .select() |
| 41 | .from(auditLog) |
| 42 | .where(and(scopeClause, timeCursorStabilityBound(auditLog.createdAt), cursorClause)) |
| 43 | .orderBy(...timeCursorOrderBy(auditLog.createdAt, auditLog.id)) |
| 44 | .limit(input.chunkSize) |
| 45 | |
| 46 | if (rows.length === 0) return |
| 47 | yield rows |
| 48 | const last = rows[rows.length - 1] |
| 49 | cursor = { ts: last.createdAt.toISOString(), id: last.id } |
| 50 | if (rows.length < input.chunkSize) return |
| 51 | } |
| 52 | } |
| 53 | |
| 54 | export const auditLogsSource: DrainSource<AuditLogRow> = { |
| 55 | type: 'audit_logs', |
nothing calls this directly
no test coverage detected