* Execute a function with exponential backoff retry for rate limiting errors. * Only used for hosted key requests. Tracks rate limit events via telemetry. * * On terminal upstream 429, optionally re-enters the hosted-key queue (which waits for * the per-workspace bucket to refill) and retries on
( fn: () => Promise<T>, context: RetryContext, maxRetries = 3, baseDelayMs = 1000 )
| 423 | * re-queue the call instead of surfacing the error. |
| 424 | */ |
| 425 | async function executeWithRetry<T>( |
| 426 | fn: () => Promise<T>, |
| 427 | context: RetryContext, |
| 428 | maxRetries = 3, |
| 429 | baseDelayMs = 1000 |
| 430 | ): Promise<T> { |
| 431 | const { |
| 432 | requestId, |
| 433 | toolId, |
| 434 | provider, |
| 435 | envVarName, |
| 436 | executionContext, |
| 437 | reacquireAfterRetriesExhausted, |
| 438 | } = context |
| 439 | let lastError: unknown |
| 440 | |
| 441 | for (let attempt = 0; attempt <= maxRetries; attempt++) { |
| 442 | try { |
| 443 | return await fn() |
| 444 | } catch (error) { |
| 445 | lastError = error |
| 446 | |
| 447 | if (!isRateLimitError(error) || attempt === maxRetries) { |
| 448 | if (isRateLimitError(error) && attempt === maxRetries) { |
| 449 | if (reacquireAfterRetriesExhausted) { |
| 450 | try { |
| 451 | const requeued = await reacquireAfterRetriesExhausted() |
| 452 | if (requeued) { |
| 453 | logger.warn( |
| 454 | `[${requestId}] Upstream retries exhausted for ${toolId} (${envVarName}); re-queued and retrying once with fresh key` |
| 455 | ) |
| 456 | return (await requeued()) as T |
| 457 | } |
| 458 | } catch (requeueError) { |
| 459 | logger.error( |
| 460 | `[${requestId}] Re-queue after exhausted upstream retries failed for ${toolId}`, |
| 461 | { error: toError(requeueError).message } |
| 462 | ) |
| 463 | } |
| 464 | } |
| 465 | |
| 466 | PlatformEvents.hostedKeyUserThrottled({ |
| 467 | toolId, |
| 468 | reason: 'upstream_retries_exhausted', |
| 469 | provider, |
| 470 | userId: executionContext?.userId, |
| 471 | workspaceId: executionContext?.workspaceId, |
| 472 | workflowId: executionContext?.workflowId, |
| 473 | }) |
| 474 | } |
| 475 | throw error |
| 476 | } |
| 477 | |
| 478 | const delayMs = baseDelayMs * 2 ** attempt |
| 479 | |
| 480 | // Track throttling event via telemetry |
| 481 | PlatformEvents.hostedKeyRateLimited({ |
| 482 | toolId, |
no test coverage detected