MCPcopy
hub / github.com/omkarcloud/botasaurus / taskQueue

Function taskQueue

js/botasaurus-js/src/task.ts:301–374  ·  view source on GitHub ↗
(options: TaskOptions<I>& { sequential?: boolean, skipDuplicateInput?: boolean })

Source from the content-addressed store, hash-verified

299 }
300}
301export function taskQueue<I=any>(options: TaskOptions<I>& { sequential?: boolean, skipDuplicateInput?: boolean }) {
302 // Extract parallel from options - it controls queue-level concurrency, not passed to createTask
303 const { parallel: parallelOption, ...taskOptions } = options
304 const run = createTask<I>(taskOptions as TaskOptions<I>, true)
305 const performTask = () => {
306 let seenItems = new Set()
307 let lastPromise: Promise<any> = Promise.resolve()
308 const state = { promises: [] as any[], draining: false }
309 let sequential = 'sequential' in options ? options.sequential : false
310 const skipDuplicateInput = options.skipDuplicateInput ?? false
311
312 // Create concurrency limiter for parallel mode
313 const maxLimit = determineMaxLimit(parallelOption)
314
315 let limit: pLimit.Limit
316 if (maxLimit <=1) {
317 sequential = true
318 } else {
319 limit = pLimit(maxLimit)
320 }
321
322
323
324 const cleanup = () => {
325 state.promises = []
326 state.draining = false
327 seenItems.clear()
328 lastPromise = Promise.resolve()
329 }
330
331 return {
332 put: function (data: any, overrideOptions: Omit<TaskOptions<any>, 'run'> = {}) {
333 const uniqueData = getUniqueItems(data, skipDuplicateInput, seenItems)
334
335 // Handle case when all items are duplicates (skipDuplicateInput=true)
336 if (uniqueData === __SKIP__) {
337 return Promise.resolve(null)
338 }
339 if (Array.isArray(uniqueData) && uniqueData.length === 0) {
340 return Promise.resolve([])
341 }
342
343 let promise: Promise<any>
344 if (sequential) {
345 // runs sequentially
346 promise = lastPromise.then(() => run(uniqueData, overrideOptions))
347 lastPromise = promise
348
349 state.promises.push(promise)
350 return promise.then(x=>x.result)
351 } else {
352 if (Array.isArray(uniqueData)) {
353 promise = Promise.all(uniqueData.map(x=> limit(() => run(x, overrideOptions))))
354 state.promises.push(promise.then(results=>({originalData: uniqueData, result: results.map((x: any)=>x.result)})))
355 return promise.then(results=>results.map((x: any)=>x.result))
356 } else {
357 promise = limit(() => run(uniqueData, overrideOptions))
358

Callers 1

task.jsFile · 0.85

Calls 1

createTaskFunction · 0.70

Tested by

no test coverage detected