(executionId: string)
| 487 | } |
| 488 | |
| 489 | export async function resetExecutionStreamBuffer(executionId: string): Promise<boolean> { |
| 490 | const redis = getRedisClient() |
| 491 | if (!redis) { |
| 492 | if (!canUseMemoryEventBuffer()) { |
| 493 | logger.warn('resetExecutionStreamBuffer: Redis client unavailable', { executionId }) |
| 494 | return false |
| 495 | } |
| 496 | const stream = getMemoryStream(executionId) |
| 497 | stream.events = [] |
| 498 | stream.meta = { |
| 499 | status: 'active', |
| 500 | replayStartEventId: stream.nextEventId, |
| 501 | updatedAt: new Date().toISOString(), |
| 502 | } |
| 503 | stream.expiresAt = Date.now() + TTL_SECONDS * 1000 |
| 504 | return true |
| 505 | } |
| 506 | |
| 507 | try { |
| 508 | const currentSequence = Number(await redis.get(getSeqKey(executionId)).catch(() => 0)) |
| 509 | const replayStartEventId = Number.isFinite(currentSequence) ? currentSequence + 1 : 1 |
| 510 | const metaKey = getMetaKey(executionId) |
| 511 | const meta = (await redis.hgetall(metaKey).catch(() => ({}))) as Record<string, string> |
| 512 | const userId = typeof meta.userId === 'string' ? meta.userId : undefined |
| 513 | const budgetReservation: ExecutionRedisBudgetReservation = { |
| 514 | executionId, |
| 515 | userId, |
| 516 | category: 'event_buffer', |
| 517 | operation: 'reset_events', |
| 518 | bytes: 0, |
| 519 | logger, |
| 520 | } |
| 521 | const budgetKeys = getExecutionRedisBudgetKeys(budgetReservation) |
| 522 | await redis.eval( |
| 523 | RESET_STREAM_SCRIPT, |
| 524 | 2 + budgetKeys.length, |
| 525 | getEventsKey(executionId), |
| 526 | metaKey, |
| 527 | ...budgetKeys, |
| 528 | String(replayStartEventId), |
| 529 | new Date().toISOString(), |
| 530 | TTL_SECONDS, |
| 531 | getExecutionRedisBudgetLimits().ttlSeconds |
| 532 | ) |
| 533 | return true |
| 534 | } catch (error) { |
| 535 | logger.warn('Failed to reset execution stream buffer', { |
| 536 | executionId, |
| 537 | error: toError(error).message, |
| 538 | }) |
| 539 | return false |
| 540 | } |
| 541 | } |
| 542 | |
| 543 | export async function setExecutionMeta( |
| 544 | executionId: string, |
no test coverage detected