MCPcopy
hub / github.com/Effect-TS/effect / producer

Function producer

packages/effect/src/internal/stream.ts:1133–1170  ·  view source on GitHub ↗
(
    queue: Queue.Queue<readonly [Take.Take<A, E>, Deferred.Deferred<void>]>,
    ref: Ref.Ref<Deferred.Deferred<void>>
  )

Source from the content-addressed store, hash-verified

1131 bufferChannel: Channel.Channel<Chunk.Chunk<A>, unknown, E, unknown, void, unknown, R>
1132): Channel.Channel<Chunk.Chunk<A>, unknown, E, unknown, void, unknown, R> => {
1133 const producer = (
1134 queue: Queue.Queue<readonly [Take.Take<A, E>, Deferred.Deferred<void>]>,
1135 ref: Ref.Ref<Deferred.Deferred<void>>
1136 ): Channel.Channel<never, Chunk.Chunk<A>, never, E, unknown, unknown, R> => {
1137 const terminate = (take: Take.Take<A, E>): Channel.Channel<never, Chunk.Chunk<A>, never, E, unknown, unknown, R> =>
1138 pipe(
1139 Ref.get(ref),
1140 Effect.tap(Deferred.await),
1141 Effect.zipRight(Deferred.make<void>()),
1142 Effect.flatMap((deferred) =>
1143 pipe(
1144 Queue.offer(queue, [take, deferred] as const),
1145 Effect.zipRight(Ref.set(ref, deferred)),
1146 Effect.zipRight(Deferred.await(deferred))
1147 )
1148 ),
1149 Effect.asVoid,
1150 core.fromEffect
1151 )
1152 return core.readWithCause({
1153 onInput: (input: Chunk.Chunk<A>) =>
1154 pipe(
1155 Deferred.make<void>(),
1156 Effect.flatMap(
1157 (deferred) =>
1158 pipe(
1159 Queue.offer(queue, [InternalTake.chunk(input), deferred] as const),
1160 Effect.flatMap((added) => pipe(Ref.set(ref, deferred), Effect.when(() => added)))
1161 )
1162 ),
1163 Effect.asVoid,
1164 core.fromEffect,
1165 core.flatMap(() => producer(queue, ref))
1166 ),
1167 onFailure: (error) => terminate(InternalTake.failCause(error)),
1168 onDone: () => terminate(InternalTake.end)
1169 })
1170 }
1171 const consumer = (
1172 queue: Queue.Queue<readonly [Take.Take<A, E>, Deferred.Deferred<void>]>
1173 ): Channel.Channel<Chunk.Chunk<A>, unknown, E, unknown, void, unknown, R> => {

Callers 2

bufferSignalFunction · 0.85
stream.tsFile · 0.85

Calls 13

terminateFunction · 0.85
failCauseMethod · 0.80
fromEffectMethod · 0.80
pipeFunction · 0.70
makeMethod · 0.65
offerMethod · 0.65
chunkMethod · 0.65
setMethod · 0.65
pipeMethod · 0.65
takeMethod · 0.65
mapMethod · 0.65
failMethod · 0.65

Tested by

no test coverage detected