MCPcopy
hub / github.com/Effect-TS/effect / asyncPush

Function asyncPush

packages/effect/src/internal/stream.ts:613–638  ·  view source on GitHub ↗
(
  register: (emit: Emit.EmitOpsPush<E, A>) => Effect.Effect<unknown, E, R | Scope.Scope>,
  options?: {
    readonly bufferSize: "unbounded"
  } | {
    readonly bufferSize?: number | undefined
    readonly strategy?: "dropping" | "sliding" | undefined
  } | undefined
)

Source from the content-addressed store, hash-verified

611
612/** @internal */
613export const asyncPush = <A, E = never, R = never>(
614 register: (emit: Emit.EmitOpsPush<E, A>) => Effect.Effect<unknown, E, R | Scope.Scope>,
615 options?: {
616 readonly bufferSize: "unbounded"
617 } | {
618 readonly bufferSize?: number | undefined
619 readonly strategy?: "dropping" | "sliding" | undefined
620 } | undefined
621): Stream.Stream<A, E, Exclude<R, Scope.Scope>> =>
622 Effect.acquireRelease(
623 queueFromBufferOptionsPush<A, E>(options),
624 Queue.shutdown
625 ).pipe(
626 Effect.tap((queue) =>
627 FiberRef.getWith(FiberRef.currentScheduler, (scheduler) => register(emit.makePush(queue, scheduler)))
628 ),
629 Effect.map((queue) => {
630 const loop: Channel.Channel<Chunk.Chunk<A>, unknown, E> = core.flatMap(Queue.take(queue), (item) =>
631 Exit.isExit(item)
632 ? Exit.isSuccess(item) ? core.void : core.failCause(item.cause)
633 : channel.zipRight(core.write(Chunk.unsafeFromArray(item)), loop))
634 return loop
635 }),
636 channel.unwrapScoped,
637 fromChannel
638 )
639
640/** @internal */
641export const asyncScoped = <A, E = never, R = never>(

Callers

nothing calls this directly

Calls 7

failCauseMethod · 0.80
registerFunction · 0.70
pipeMethod · 0.65
mapMethod · 0.65
takeMethod · 0.65
writeMethod · 0.65

Tested by

no test coverage detected