| 420 | } |
| 421 | ): GroupBy.GroupBy<K, A, E, R> => { |
| 422 | const loop = ( |
| 423 | map: Map<K, Queue.Queue<Take.Take<A, E>>>, |
| 424 | outerQueue: Queue.Queue<Take.Take<readonly [K, Queue.Queue<Take.Take<A, E>>], E>> |
| 425 | ): Channel.Channel<never, Chunk.Chunk<A>, E, E, unknown, unknown, R> => |
| 426 | core.readWithCause({ |
| 427 | onInput: (input: Chunk.Chunk<A>) => |
| 428 | core.flatMap( |
| 429 | core.fromEffect( |
| 430 | Effect.forEach(groupByIterable(input, f), ([key, values]) => { |
| 431 | const innerQueue = map.get(key) |
| 432 | if (innerQueue === undefined) { |
| 433 | return pipe( |
| 434 | Queue.bounded<Take.Take<A, E>>(options?.bufferSize ?? 16), |
| 435 | Effect.flatMap((innerQueue) => |
| 436 | pipe( |
| 437 | Effect.sync(() => { |
| 438 | map.set(key, innerQueue) |
| 439 | }), |
| 440 | Effect.zipRight( |
| 441 | Queue.offer(outerQueue, take.of([key, innerQueue] as const)) |
| 442 | ), |
| 443 | Effect.zipRight( |
| 444 | pipe( |
| 445 | Queue.offer(innerQueue, take.chunk(values)), |
| 446 | Effect.catchSomeCause((cause) => |
| 447 | Cause.isInterruptedOnly(cause) ? |
| 448 | Option.some(Effect.void) : |
| 449 | Option.none() |
| 450 | ) |
| 451 | ) |
| 452 | ) |
| 453 | ) |
| 454 | ) |
| 455 | ) |
| 456 | } |
| 457 | return Effect.catchSomeCause( |
| 458 | Queue.offer(innerQueue, take.chunk(values)), |
| 459 | (cause) => |
| 460 | Cause.isInterruptedOnly(cause) ? |
| 461 | Option.some(Effect.void) : |
| 462 | Option.none() |
| 463 | ) |
| 464 | }, { discard: true }) |
| 465 | ), |
| 466 | () => loop(map, outerQueue) |
| 467 | ), |
| 468 | onFailure: (cause) => core.fromEffect(Queue.offer(outerQueue, take.failCause(cause))), |
| 469 | onDone: () => |
| 470 | core.fromEffect( |
| 471 | pipe( |
| 472 | Effect.forEach(map.entries(), ([_, innerQueue]) => |
| 473 | pipe( |
| 474 | Queue.offer(innerQueue, take.end), |
| 475 | Effect.catchSomeCause((cause) => |
| 476 | Cause.isInterruptedOnly(cause) ? |
| 477 | Option.some(Effect.void) : |
| 478 | Option.none() |
| 479 | ) |