(
register: (
emit: Emit.Emit<R, E, A, void>
) => Effect.Effect<void, never, R> | void,
bufferSize?: number | "unbounded" | {
readonly bufferSize?: number | undefined
readonly strategy?: "dropping" | "sliding" | "suspend" | undefined
} | undefined
)
| 479 | |
| 480 | /** @internal */ |
| 481 | export const _async = <A, E = never, R = never>( |
| 482 | register: ( |
| 483 | emit: Emit.Emit<R, E, A, void> |
| 484 | ) => Effect.Effect<void, never, R> | void, |
| 485 | bufferSize?: number | "unbounded" | { |
| 486 | readonly bufferSize?: number | undefined |
| 487 | readonly strategy?: "dropping" | "sliding" | "suspend" | undefined |
| 488 | } | undefined |
| 489 | ): Stream.Stream<A, E, R> => |
| 490 | Effect.acquireRelease( |
| 491 | queueFromBufferOptions<A, E>(bufferSize), |
| 492 | (queue) => Queue.shutdown(queue) |
| 493 | ).pipe( |
| 494 | Effect.flatMap((output) => |
| 495 | Effect.runtime<R>().pipe( |
| 496 | Effect.flatMap((runtime) => |
| 497 | Effect.sync(() => { |
| 498 | const runPromiseExit = Runtime.runPromiseExit(runtime) |
| 499 | const canceler = register(emit.make<R, E, A, void>((resume) => |
| 500 | InternalTake.fromPull(resume).pipe( |
| 501 | Effect.flatMap((take) => Queue.offer(output, take)), |
| 502 | Effect.asVoid, |
| 503 | runPromiseExit |
| 504 | ).then((exit) => { |
| 505 | if (Exit.isFailure(exit)) { |
| 506 | if (!Cause.isInterrupted(exit.cause)) { |
| 507 | throw Cause.squash(exit.cause) |
| 508 | } |
| 509 | } |
| 510 | }) |
| 511 | )) |
| 512 | return canceler |
| 513 | }) |
| 514 | ), |
| 515 | Effect.map((value) => { |
| 516 | const loop: Channel.Channel<Chunk.Chunk<A>, unknown, E, unknown, void, unknown> = Queue.take(output).pipe( |
| 517 | Effect.flatMap((take) => InternalTake.done(take)), |
| 518 | Effect.match({ |
| 519 | onFailure: (maybeError) => |
| 520 | core.fromEffect(Queue.shutdown(output)).pipe( |
| 521 | channel.zipRight(Option.match(maybeError, { |
| 522 | onNone: () => core.void, |
| 523 | onSome: (error) => core.fail(error) |
| 524 | })) |
| 525 | ), |
| 526 | onSuccess: (chunk) => core.write(chunk).pipe(core.flatMap(() => loop)) |
| 527 | }), |
| 528 | channel.unwrap |
| 529 | ) |
| 530 | return fromChannel(loop).pipe(ensuring(value ?? Effect.void)) |
| 531 | }) |
| 532 | ) |
| 533 | ), |
| 534 | unwrapScoped |
| 535 | ) |
| 536 | |
| 537 | /** @internal */ |
| 538 | export const asyncEffect = <A, E = never, R = never>( |
nothing calls this directly
no test coverage detected