MCPcopy
hub / github.com/Effect-TS/effect / asyncScoped

Function asyncScoped

packages/effect/src/internal/stream.ts:641–700  ·  view source on GitHub ↗
(
  register: (emit: Emit.Emit<R, E, A, void>) => Effect.Effect<unknown, E, R | Scope.Scope>,
  bufferSize?: number | "unbounded" | {
    readonly bufferSize?: number | undefined
    readonly strategy?: "dropping" | "sliding" | "suspend" | undefined
  } | undefined
)

Source from the content-addressed store, hash-verified

639
640/** @internal */
641export const asyncScoped = <A, E = never, R = never>(
642 register: (emit: Emit.Emit<R, E, A, void>) => Effect.Effect<unknown, E, R | Scope.Scope>,
643 bufferSize?: number | "unbounded" | {
644 readonly bufferSize?: number | undefined
645 readonly strategy?: "dropping" | "sliding" | "suspend" | undefined
646 } | undefined
647): Stream.Stream<A, E, Exclude<R, Scope.Scope>> =>
648 pipe(
649 Effect.acquireRelease(
650 queueFromBufferOptions<A, E>(bufferSize),
651 (queue) => Queue.shutdown(queue)
652 ),
653 Effect.flatMap((output) =>
654 pipe(
655 Effect.runtime<R>(),
656 Effect.flatMap((runtime) =>
657 pipe(
658 register(
659 emit.make((k) =>
660 pipe(
661 InternalTake.fromPull(k),
662 Effect.flatMap((take) => Queue.offer(output, take)),
663 Effect.asVoid,
664 Runtime.runPromiseExit(runtime)
665 ).then((exit) => {
666 if (Exit.isFailure(exit)) {
667 if (!Cause.isInterrupted(exit.cause)) {
668 throw Cause.squash(exit.cause)
669 }
670 }
671 })
672 )
673 ),
674 Effect.zipRight(Ref.make(false)),
675 Effect.flatMap((ref) =>
676 pipe(
677 Ref.get(ref),
678 Effect.map((isDone) =>
679 isDone ?
680 pull.end() :
681 pipe(
682 Queue.take(output),
683 Effect.flatMap(InternalTake.done),
684 Effect.onError(() =>
685 pipe(
686 Ref.set(ref, true),
687 Effect.zipRight(Queue.shutdown(output))
688 )
689 )
690 )
691 )
692 )
693 )
694 )
695 )
696 )
697 ),
698 scoped,

Callers

nothing calls this directly

Calls 14

queueFromBufferOptionsFunction · 0.85
runtimeMethod · 0.80
isInterruptedMethod · 0.80
onErrorMethod · 0.80
pipeFunction · 0.70
registerFunction · 0.70
makeMethod · 0.65
offerMethod · 0.65
getMethod · 0.65
mapMethod · 0.65
endMethod · 0.65
takeMethod · 0.65

Tested by

no test coverage detected