(terminalStatus?: TerminalExecutionStreamStatus)
| 783 | let writeFailure: Error | null = null |
| 784 | |
| 785 | const doFlush = async (terminalStatus?: TerminalExecutionStreamStatus): Promise<boolean> => { |
| 786 | if (pending.length === 0) return true |
| 787 | const batch = pending |
| 788 | pending = [] |
| 789 | try { |
| 790 | const key = getEventsKey(executionId) |
| 791 | const zaddArgs: (string | number)[] = [] |
| 792 | let batchBytes = 0 |
| 793 | for (const entry of batch) { |
| 794 | const entryJson = getExecutionEventEntryJson(entry) |
| 795 | batchBytes += Buffer.byteLength(entryJson, 'utf8') |
| 796 | zaddArgs.push(entry.eventId, entryJson) |
| 797 | } |
| 798 | const budgetReservation: ExecutionRedisBudgetReservation = { |
| 799 | executionId, |
| 800 | userId: context.userId, |
| 801 | category: 'event_buffer', |
| 802 | operation: terminalStatus ? 'write_terminal_events' : 'write_events', |
| 803 | bytes: batchBytes, |
| 804 | logger, |
| 805 | } |
| 806 | const limits = getExecutionRedisBudgetLimits() |
| 807 | if (batchBytes > limits.maxSingleWriteBytes) { |
| 808 | throw new ExecutionResourceLimitError({ |
| 809 | resource: 'redis_key_bytes', |
| 810 | attemptedBytes: batchBytes, |
| 811 | limitBytes: limits.maxSingleWriteBytes, |
| 812 | }) |
| 813 | } |
| 814 | const budgetKeys = getExecutionRedisBudgetKeys(budgetReservation) |
| 815 | const flushResult = getFlushScriptResult( |
| 816 | await redis.eval( |
| 817 | FLUSH_EVENTS_SCRIPT, |
| 818 | 3 + budgetKeys.length, |
| 819 | key, |
| 820 | getSeqKey(executionId), |
| 821 | getMetaKey(executionId), |
| 822 | ...budgetKeys, |
| 823 | TTL_SECONDS, |
| 824 | EVENT_LIMIT, |
| 825 | new Date().toISOString(), |
| 826 | terminalStatus ?? '', |
| 827 | batchBytes, |
| 828 | limits.maxExecutionBytes, |
| 829 | limits.maxUserBytes, |
| 830 | limits.ttlSeconds, |
| 831 | ...zaddArgs |
| 832 | ) |
| 833 | ) |
| 834 | if (!flushResult.allowed) { |
| 835 | throw new ExecutionResourceLimitError({ |
| 836 | resource: |
| 837 | flushResult.resource === 'user_redis_bytes' |
| 838 | ? 'user_redis_bytes' |
| 839 | : 'execution_redis_bytes', |
| 840 | attemptedBytes: batchBytes, |
| 841 | currentBytes: flushResult.currentBytes ?? 0, |
| 842 | limitBytes: |
no test coverage detected