MCPcopy Index your code
hub / github.com/Effect-TS/effect / asyncEffect

Function asyncEffect

packages/effect/src/internal/stream.ts:538–593  ·  view source on GitHub ↗
(
  register: (emit: Emit.Emit<R, E, A, void>) => Effect.Effect<unknown, E, R>,
  bufferSize?: number | "unbounded" | {
    readonly bufferSize?: number | undefined
    readonly strategy?: "dropping" | "sliding" | "suspend" | undefined
  } | undefined
)

Source from the content-addressed store, hash-verified

536
537/** @internal */
538export const asyncEffect = <A, E = never, R = never>(
539 register: (emit: Emit.Emit<R, E, A, void>) => Effect.Effect<unknown, E, R>,
540 bufferSize?: number | "unbounded" | {
541 readonly bufferSize?: number | undefined
542 readonly strategy?: "dropping" | "sliding" | "suspend" | undefined
543 } | undefined
544): Stream.Stream<A, E, R> =>
545 pipe(
546 Effect.acquireRelease(
547 queueFromBufferOptions<A, E>(bufferSize),
548 (queue) => Queue.shutdown(queue)
549 ),
550 Effect.flatMap((output) =>
551 pipe(
552 Effect.runtime<R>(),
553 Effect.flatMap((runtime) =>
554 pipe(
555 register(
556 emit.make((k) =>
557 pipe(
558 InternalTake.fromPull(k),
559 Effect.flatMap((take) => Queue.offer(output, take)),
560 Effect.asVoid,
561 Runtime.runPromiseExit(runtime)
562 ).then((exit) => {
563 if (Exit.isFailure(exit)) {
564 if (!Cause.isInterrupted(exit.cause)) {
565 throw Cause.squash(exit.cause)
566 }
567 }
568 })
569 )
570 ),
571 Effect.map(() => {
572 const loop: Channel.Channel<Chunk.Chunk<A>, unknown, E, unknown, void, unknown> = pipe(
573 Queue.take(output),
574 Effect.flatMap(InternalTake.done),
575 Effect.match({
576 onFailure: (maybeError) =>
577 pipe(
578 core.fromEffect(Queue.shutdown(output)),
579 channel.zipRight(Option.match(maybeError, { onNone: () => core.void, onSome: core.fail }))
580 ),
581 onSuccess: (chunk) => pipe(core.write(chunk), core.flatMap(() => loop))
582 }),
583 channel.unwrap
584 )
585 return loop
586 })
587 )
588 )
589 )
590 ),
591 channel.unwrapScoped,
592 fromChannel
593 )
594
595const queueFromBufferOptionsPush = <A, E>(

Callers

nothing calls this directly

Calls 12

queueFromBufferOptionsFunction · 0.85
runtimeMethod · 0.80
isInterruptedMethod · 0.80
fromEffectMethod · 0.80
pipeFunction · 0.70
registerFunction · 0.70
makeMethod · 0.65
offerMethod · 0.65
mapMethod · 0.65
takeMethod · 0.65
writeMethod · 0.65
shutdownMethod · 0.45

Tested by

no test coverage detected