MCPcopy
hub / github.com/Effect-TS/effect / consumer

Function consumer

packages/effect/src/internal/stream.ts:1171–1188  ·  view source on GitHub ↗
(
    queue: Queue.Queue<readonly [Take.Take<A, E>, Deferred.Deferred<void>]>
  )

Source from the content-addressed store, hash-verified

1169 })
1170 }
1171 const consumer = (
1172 queue: Queue.Queue<readonly [Take.Take<A, E>, Deferred.Deferred<void>]>
1173 ): Channel.Channel<Chunk.Chunk<A>, unknown, E, unknown, void, unknown, R> => {
1174 const process: Channel.Channel<Chunk.Chunk<A>, unknown, E, unknown, void, unknown> = pipe(
1175 core.fromEffect(Queue.take(queue)),
1176 core.flatMap(([take, deferred]) =>
1177 channel.zipRight(
1178 core.fromEffect(Deferred.succeed(deferred, void 0)),
1179 InternalTake.match(take, {
1180 onEnd: () => core.void,
1181 onFailure: core.failCause,
1182 onSuccess: (value) => pipe(core.write(value), core.flatMap(() => process))
1183 })
1184 )
1185 )
1186 )
1187 return process
1188 }
1189 return channel.unwrapScoped(
1190 pipe(
1191 scoped,

Callers 3

bufferSignalFunction · 0.85
enqueueFunction · 0.85
stream.tsFile · 0.85

Calls 10

enqueueFunction · 0.85
fromEffectMethod · 0.80
failCauseMethod · 0.80
pipeFunction · 0.70
takeMethod · 0.65
writeMethod · 0.65
pipeMethod · 0.65
mapMethod · 0.65
joinMethod · 0.65
interruptMethod · 0.45

Tested by

no test coverage detected