MCPcopy Index your code
hub / github.com/omkarcloud/botasaurus / drainQueue

Function drainQueue

js/botasaurus-js/src/task.ts:383–463  ·  view source on GitHub ↗
(
    state: { promises: Promise<any>[], draining: boolean },
    cleanup: () => void,
    removeItemFromSeenItemsSet: (item: any) => void,
    options: { output?: string | ((data: any, result: any) => void) | null, outputFormats?: FormatType[] | null },
    fnName: string,
    n: number | null = null
)

Source from the content-addressed store, hash-verified

381}
382
383export async function drainQueue(
384 state: { promises: Promise<any>[], draining: boolean },
385 cleanup: () => void,
386 removeItemFromSeenItemsSet: (item: any) => void,
387 options: { output?: string | ((data: any, result: any) => void) | null, outputFormats?: FormatType[] | null },
388 fnName: string,
389 n: number | null = null
390) {
391 if (n !== null && n < 1) {
392 throw new Error('n must be >= 1')
393 }
394
395 if (!state.promises.length){
396 return []
397 }
398
399
400 const result_list: any[] = []
401 const orignal_data: any[] = []
402 // Only partial drain if n is specified AND less than current promises count
403 const isPartialDrain = n !== null && n < state.promises.length
404
405 state.draining = true
406 try {
407 if (isPartialDrain) {
408 // Partial drain - take first n promises
409 let remaining = n!
410 while (state.promises.length > 0 && remaining > 0) {
411 const takeCount = Math.min(remaining, state.promises.length)
412 const currentPromises = state.promises.slice(0, takeCount)
413 state.promises = state.promises.slice(takeCount)
414 remaining -= takeCount
415
416 const results = await Promise.all(currentPromises)
417 for (let index = 0; index < results.length; index++) {
418 const { originalData, result } = results[index]
419 if (Array.isArray(originalData)) {
420 orignal_data.push(...originalData)
421 result_list.push(...result)
422 } else {
423 orignal_data.push(originalData)
424 result_list.push(result)
425 }
426 removeItemFromSeenItemsSet(originalData)
427 }
428 }
429 } else {
430 // Full drain - keep processing until no new promises are added
431 while (state.promises.length > 0) {
432 const currentPromises = state.promises
433 state.promises = [] // Reset before awaiting so new promises can be added
434 const results = await Promise.all(currentPromises)
435 for (let index = 0; index < results.length; index++) {
436 const { originalData, result } = results[index]
437 if (Array.isArray(originalData)) {
438 orignal_data.push(...originalData)
439 result_list.push(...result)
440 } else {

Callers 2

performPlaywrightFunction · 0.90
performTaskFunction · 0.85

Calls 5

flattenFunction · 0.90
writeOutputFunction · 0.90
cleanupFunction · 0.70
pushMethod · 0.45

Tested by

no test coverage detected