( jobPayloads: DocumentProcessingPayload[], requestId: string )
| 432 | } |
| 433 | |
| 434 | async function dispatchViaBatchTrigger( |
| 435 | jobPayloads: DocumentProcessingPayload[], |
| 436 | requestId: string |
| 437 | ): Promise<number> { |
| 438 | let dispatched = 0 |
| 439 | const batchIds: string[] = [] |
| 440 | const region = await resolveTriggerRegion() |
| 441 | for (let i = 0; i < jobPayloads.length; i += TRIGGER_BATCH_SIZE) { |
| 442 | const chunk = jobPayloads.slice(i, i + TRIGGER_BATCH_SIZE) |
| 443 | try { |
| 444 | const result = await tasks.batchTrigger<typeof processDocumentTask>( |
| 445 | 'knowledge-process-document', |
| 446 | chunk.map((payload) => ({ |
| 447 | payload, |
| 448 | options: { |
| 449 | // Scoped to (documentId, requestId): blocks intra-dispatch retries |
| 450 | // from double-enqueuing; later syncs use a fresh requestId. |
| 451 | idempotencyKey: `doc-process-${payload.documentId}-${requestId}`, |
| 452 | tags: [ |
| 453 | `knowledgeBaseId:${payload.knowledgeBaseId}`, |
| 454 | `documentId:${payload.documentId}`, |
| 455 | ], |
| 456 | region, |
| 457 | }, |
| 458 | })) |
| 459 | ) |
| 460 | batchIds.push(result.batchId) |
| 461 | dispatched += chunk.length |
| 462 | } catch (error) { |
| 463 | logger.error(`[${requestId}] Failed to batchTrigger ${chunk.length} document jobs`, { |
| 464 | error: getErrorMessage(error), |
| 465 | }) |
| 466 | } |
| 467 | } |
| 468 | if (batchIds.length > 0) { |
| 469 | logger.info(`[${requestId}] Trigger.dev batches dispatched`, { batchIds }) |
| 470 | } |
| 471 | return dispatched |
| 472 | } |
| 473 | |
| 474 | async function dispatchInProcess( |
| 475 | jobPayloads: DocumentProcessingPayload[], |
no test coverage detected