MCPcopy Index your code
hub / github.com/Effect-TS/effect / _async

Function _async

packages/effect/src/internal/stream.ts:481–535  ·  view source on GitHub ↗
(
  register: (
    emit: Emit.Emit<R, E, A, void>
  ) => Effect.Effect<void, never, R> | void,
  bufferSize?: number | "unbounded" | {
    readonly bufferSize?: number | undefined
    readonly strategy?: "dropping" | "sliding" | "suspend" | undefined
  } | undefined
)

Source from the content-addressed store, hash-verified

479
480/** @internal */
481export const _async = <A, E = never, R = never>(
482 register: (
483 emit: Emit.Emit<R, E, A, void>
484 ) => Effect.Effect<void, never, R> | void,
485 bufferSize?: number | "unbounded" | {
486 readonly bufferSize?: number | undefined
487 readonly strategy?: "dropping" | "sliding" | "suspend" | undefined
488 } | undefined
489): Stream.Stream<A, E, R> =>
490 Effect.acquireRelease(
491 queueFromBufferOptions<A, E>(bufferSize),
492 (queue) => Queue.shutdown(queue)
493 ).pipe(
494 Effect.flatMap((output) =>
495 Effect.runtime<R>().pipe(
496 Effect.flatMap((runtime) =>
497 Effect.sync(() => {
498 const runPromiseExit = Runtime.runPromiseExit(runtime)
499 const canceler = register(emit.make<R, E, A, void>((resume) =>
500 InternalTake.fromPull(resume).pipe(
501 Effect.flatMap((take) => Queue.offer(output, take)),
502 Effect.asVoid,
503 runPromiseExit
504 ).then((exit) => {
505 if (Exit.isFailure(exit)) {
506 if (!Cause.isInterrupted(exit.cause)) {
507 throw Cause.squash(exit.cause)
508 }
509 }
510 })
511 ))
512 return canceler
513 })
514 ),
515 Effect.map((value) => {
516 const loop: Channel.Channel<Chunk.Chunk<A>, unknown, E, unknown, void, unknown> = Queue.take(output).pipe(
517 Effect.flatMap((take) => InternalTake.done(take)),
518 Effect.match({
519 onFailure: (maybeError) =>
520 core.fromEffect(Queue.shutdown(output)).pipe(
521 channel.zipRight(Option.match(maybeError, {
522 onNone: () => core.void,
523 onSome: (error) => core.fail(error)
524 }))
525 ),
526 onSuccess: (chunk) => core.write(chunk).pipe(core.flatMap(() => loop))
527 }),
528 channel.unwrap
529 )
530 return fromChannel(loop).pipe(ensuring(value ?? Effect.void))
531 })
532 )
533 ),
534 unwrapScoped
535 )
536
537/** @internal */
538export const asyncEffect = <A, E = never, R = never>(

Callers

nothing calls this directly

Calls 15

queueFromBufferOptionsFunction · 0.85
runtimeMethod · 0.80
syncMethod · 0.80
isInterruptedMethod · 0.80
fromEffectMethod · 0.80
registerFunction · 0.70
fromChannelFunction · 0.70
pipeMethod · 0.65
makeMethod · 0.65
offerMethod · 0.65
mapMethod · 0.65
takeMethod · 0.65

Tested by

no test coverage detected