MCPcopy Index your code
hub / github.com/Effect-TS/effect / loop

Function loop

packages/effect/src/internal/groupBy.ts:422–484  ·  view source on GitHub ↗
(
      map: Map<K, Queue.Queue<Take.Take<A, E>>>,
      outerQueue: Queue.Queue<Take.Take<readonly [K, Queue.Queue<Take.Take<A, E>>], E>>
    )

Source from the content-addressed store, hash-verified

420 }
421 ): GroupBy.GroupBy<K, A, E, R> => {
422 const loop = (
423 map: Map<K, Queue.Queue<Take.Take<A, E>>>,
424 outerQueue: Queue.Queue<Take.Take<readonly [K, Queue.Queue<Take.Take<A, E>>], E>>
425 ): Channel.Channel<never, Chunk.Chunk<A>, E, E, unknown, unknown, R> =>
426 core.readWithCause({
427 onInput: (input: Chunk.Chunk<A>) =>
428 core.flatMap(
429 core.fromEffect(
430 Effect.forEach(groupByIterable(input, f), ([key, values]) => {
431 const innerQueue = map.get(key)
432 if (innerQueue === undefined) {
433 return pipe(
434 Queue.bounded<Take.Take<A, E>>(options?.bufferSize ?? 16),
435 Effect.flatMap((innerQueue) =>
436 pipe(
437 Effect.sync(() => {
438 map.set(key, innerQueue)
439 }),
440 Effect.zipRight(
441 Queue.offer(outerQueue, take.of([key, innerQueue] as const))
442 ),
443 Effect.zipRight(
444 pipe(
445 Queue.offer(innerQueue, take.chunk(values)),
446 Effect.catchSomeCause((cause) =>
447 Cause.isInterruptedOnly(cause) ?
448 Option.some(Effect.void) :
449 Option.none()
450 )
451 )
452 )
453 )
454 )
455 )
456 }
457 return Effect.catchSomeCause(
458 Queue.offer(innerQueue, take.chunk(values)),
459 (cause) =>
460 Cause.isInterruptedOnly(cause) ?
461 Option.some(Effect.void) :
462 Option.none()
463 )
464 }, { discard: true })
465 ),
466 () => loop(map, outerQueue)
467 ),
468 onFailure: (cause) => core.fromEffect(Queue.offer(outerQueue, take.failCause(cause))),
469 onDone: () =>
470 core.fromEffect(
471 pipe(
472 Effect.forEach(map.entries(), ([_, innerQueue]) =>
473 pipe(
474 Queue.offer(innerQueue, take.end),
475 Effect.catchSomeCause((cause) =>
476 Cause.isInterruptedOnly(cause) ?
477 Option.some(Effect.void) :
478 Option.none()
479 )

Callers 1

groupBy.tsFile · 0.70

Calls 10

fromEffectMethod · 0.80
syncMethod · 0.80
failCauseMethod · 0.80
entriesMethod · 0.80
pipeFunction · 0.70
getMethod · 0.65
setMethod · 0.65
offerMethod · 0.65
ofMethod · 0.65
chunkMethod · 0.65

Tested by

no test coverage detected