(
register: (emit: Emit.Emit<R, E, A, void>) => Effect.Effect<unknown, E, R>,
bufferSize?: number | "unbounded" | {
readonly bufferSize?: number | undefined
readonly strategy?: "dropping" | "sliding" | "suspend" | undefined
} | undefined
)
| 536 | |
| 537 | /** @internal */ |
| 538 | export const asyncEffect = <A, E = never, R = never>( |
| 539 | register: (emit: Emit.Emit<R, E, A, void>) => Effect.Effect<unknown, E, R>, |
| 540 | bufferSize?: number | "unbounded" | { |
| 541 | readonly bufferSize?: number | undefined |
| 542 | readonly strategy?: "dropping" | "sliding" | "suspend" | undefined |
| 543 | } | undefined |
| 544 | ): Stream.Stream<A, E, R> => |
| 545 | pipe( |
| 546 | Effect.acquireRelease( |
| 547 | queueFromBufferOptions<A, E>(bufferSize), |
| 548 | (queue) => Queue.shutdown(queue) |
| 549 | ), |
| 550 | Effect.flatMap((output) => |
| 551 | pipe( |
| 552 | Effect.runtime<R>(), |
| 553 | Effect.flatMap((runtime) => |
| 554 | pipe( |
| 555 | register( |
| 556 | emit.make((k) => |
| 557 | pipe( |
| 558 | InternalTake.fromPull(k), |
| 559 | Effect.flatMap((take) => Queue.offer(output, take)), |
| 560 | Effect.asVoid, |
| 561 | Runtime.runPromiseExit(runtime) |
| 562 | ).then((exit) => { |
| 563 | if (Exit.isFailure(exit)) { |
| 564 | if (!Cause.isInterrupted(exit.cause)) { |
| 565 | throw Cause.squash(exit.cause) |
| 566 | } |
| 567 | } |
| 568 | }) |
| 569 | ) |
| 570 | ), |
| 571 | Effect.map(() => { |
| 572 | const loop: Channel.Channel<Chunk.Chunk<A>, unknown, E, unknown, void, unknown> = pipe( |
| 573 | Queue.take(output), |
| 574 | Effect.flatMap(InternalTake.done), |
| 575 | Effect.match({ |
| 576 | onFailure: (maybeError) => |
| 577 | pipe( |
| 578 | core.fromEffect(Queue.shutdown(output)), |
| 579 | channel.zipRight(Option.match(maybeError, { onNone: () => core.void, onSome: core.fail })) |
| 580 | ), |
| 581 | onSuccess: (chunk) => pipe(core.write(chunk), core.flatMap(() => loop)) |
| 582 | }), |
| 583 | channel.unwrap |
| 584 | ) |
| 585 | return loop |
| 586 | }) |
| 587 | ) |
| 588 | ) |
| 589 | ) |
| 590 | ), |
| 591 | channel.unwrapScoped, |
| 592 | fromChannel |
| 593 | ) |
| 594 | |
| 595 | const queueFromBufferOptionsPush = <A, E>( |
nothing calls this directly
no test coverage detected