MCPcopy Index your code
hub / github.com/Effect-TS/effect / bufferUnbounded

Function bufferUnbounded

packages/effect/src/internal/stream.ts:1110–1127  ·  view source on GitHub ↗
(self: Stream.Stream<A, E, R>)

Source from the content-addressed store, hash-verified

1108})
1109
1110const bufferUnbounded = <A, E, R>(self: Stream.Stream<A, E, R>): Stream.Stream<A, E, R> => {
1111 const queue = toQueue(self, { strategy: "unbounded" })
1112 return new StreamImpl(
1113 channel.unwrapScoped(
1114 Effect.map(queue, (queue) => {
1115 const process: Channel.Channel<Chunk.Chunk<A>, unknown, E, unknown, void, unknown> = pipe(
1116 core.fromEffect(Queue.take(queue)),
1117 core.flatMap(InternalTake.match({
1118 onEnd: () => core.void,
1119 onFailure: core.failCause,
1120 onSuccess: (value) => core.flatMap(core.write(value), () => process)
1121 }))
1122 )
1123 return process
1124 })
1125 )
1126 )
1127}
1128
1129const bufferSignal = <A, E, R>(
1130 scoped: Effect.Effect<Queue.Queue<readonly [Take.Take<A, E>, Deferred.Deferred<void>]>, never, Scope.Scope>,

Callers 1

stream.tsFile · 0.85

Calls 6

toQueueFunction · 0.85
fromEffectMethod · 0.80
pipeFunction · 0.70
mapMethod · 0.65
takeMethod · 0.65
writeMethod · 0.65

Tested by

no test coverage detected