| 299 | } |
| 300 | } |
| 301 | export function taskQueue<I=any>(options: TaskOptions<I>& { sequential?: boolean, skipDuplicateInput?: boolean }) { |
| 302 | // Extract parallel from options - it controls queue-level concurrency, not passed to createTask |
| 303 | const { parallel: parallelOption, ...taskOptions } = options |
| 304 | const run = createTask<I>(taskOptions as TaskOptions<I>, true) |
| 305 | const performTask = () => { |
| 306 | let seenItems = new Set() |
| 307 | let lastPromise: Promise<any> = Promise.resolve() |
| 308 | const state = { promises: [] as any[], draining: false } |
| 309 | let sequential = 'sequential' in options ? options.sequential : false |
| 310 | const skipDuplicateInput = options.skipDuplicateInput ?? false |
| 311 | |
| 312 | // Create concurrency limiter for parallel mode |
| 313 | const maxLimit = determineMaxLimit(parallelOption) |
| 314 | |
| 315 | let limit: pLimit.Limit |
| 316 | if (maxLimit <=1) { |
| 317 | sequential = true |
| 318 | } else { |
| 319 | limit = pLimit(maxLimit) |
| 320 | } |
| 321 | |
| 322 | |
| 323 | |
| 324 | const cleanup = () => { |
| 325 | state.promises = [] |
| 326 | state.draining = false |
| 327 | seenItems.clear() |
| 328 | lastPromise = Promise.resolve() |
| 329 | } |
| 330 | |
| 331 | return { |
| 332 | put: function (data: any, overrideOptions: Omit<TaskOptions<any>, 'run'> = {}) { |
| 333 | const uniqueData = getUniqueItems(data, skipDuplicateInput, seenItems) |
| 334 | |
| 335 | // Handle case when all items are duplicates (skipDuplicateInput=true) |
| 336 | if (uniqueData === __SKIP__) { |
| 337 | return Promise.resolve(null) |
| 338 | } |
| 339 | if (Array.isArray(uniqueData) && uniqueData.length === 0) { |
| 340 | return Promise.resolve([]) |
| 341 | } |
| 342 | |
| 343 | let promise: Promise<any> |
| 344 | if (sequential) { |
| 345 | // runs sequentially |
| 346 | promise = lastPromise.then(() => run(uniqueData, overrideOptions)) |
| 347 | lastPromise = promise |
| 348 | |
| 349 | state.promises.push(promise) |
| 350 | return promise.then(x=>x.result) |
| 351 | } else { |
| 352 | if (Array.isArray(uniqueData)) { |
| 353 | promise = Promise.all(uniqueData.map(x=> limit(() => run(x, overrideOptions)))) |
| 354 | state.promises.push(promise.then(results=>({originalData: uniqueData, result: results.map((x: any)=>x.result)}))) |
| 355 | return promise.then(results=>results.map((x: any)=>x.result)) |
| 356 | } else { |
| 357 | promise = limit(() => run(uniqueData, overrideOptions)) |
| 358 | |