MCPcopy Index your code
hub / github.com/triggerdotdev/trigger.dev / parallel

Method parallel

packages/trigger-sdk/src/io.ts:1042–1087  ·  view source on GitHub ↗
(
    cacheKey: string | any[],
    items: Array<TItem>,
    callback: (item: TItem, index: number) => Promise<T>,
    options?: Pick<RunTaskOptions, "name" | "properties">
  )

Source from the content-addressed store, hash-verified

1040 }
1041
1042 async parallel<T extends Json<T> | void, TItem>(
1043 cacheKey: string | any[],
1044 items: Array<TItem>,
1045 callback: (item: TItem, index: number) => Promise<T>,
1046 options?: Pick<RunTaskOptions, "name" | "properties">
1047 ): Promise<Array<T>> {
1048 const results = await this.runTask(
1049 cacheKey,
1050 async (task) => {
1051 const outcomes = await Promise.allSettled(
1052 items.map((item, index) => spaceOut(() => callback(item, index), index, 15))
1053 );
1054
1055 // If all the outcomes are fulfilled, return the values
1056 if (outcomes.every((outcome) => outcome.status === "fulfilled")) {
1057 return outcomes.map(
1058 (outcome) => (outcome as PromiseFulfilledResult<T>).value
1059 ) as Array<{}>;
1060 }
1061
1062 // If they any of the errors are non internal errors, throw the first one
1063 const nonInternalErrors = outcomes
1064 .filter((outcome) => outcome.status === "rejected" && !isTriggerError(outcome.reason))
1065 .map((outcome) => outcome as PromiseRejectedResult);
1066
1067 if (nonInternalErrors.length > 0) {
1068 throw nonInternalErrors[0].reason;
1069 }
1070
1071 // gather all the internal errors
1072 const internalErrors = outcomes
1073 .filter((outcome) => outcome.status === "rejected" && isTriggerError(outcome.reason))
1074 .map((outcome) => outcome as PromiseRejectedResult)
1075 .map((outcome) => outcome.reason as TriggerInternalError);
1076
1077 throw new ResumeWithParallelTaskError(task, internalErrors);
1078 },
1079 {
1080 name: "parallel",
1081 parallel: true,
1082 ...(options ?? {}),
1083 }
1084 );
1085
1086 return results as unknown as Array<T>;
1087 }
1088
1089 /** `io.runTask()` allows you to run a [Task](https://trigger.dev/docs/documentation/concepts/tasks) from inside a Job run. A Task is a resumable unit of a Run that can be retried, resumed and is logged. [Integrations](https://trigger.dev/docs/integrations) use Tasks internally to perform their actions.
1090 *

Callers 1

Calls 4

runTaskMethod · 0.95
isTriggerErrorFunction · 0.90
spaceOutFunction · 0.85
filterMethod · 0.65

Tested by

no test coverage detected