MCPcopy
hub / github.com/Effect-TS/effect / bufferSignal

Function bufferSignal

packages/effect/src/internal/stream.ts:1129–1214  ·  view source on GitHub ↗
(
  scoped: Effect.Effect<Queue.Queue<readonly [Take.Take<A, E>, Deferred.Deferred<void>]>, never, Scope.Scope>,
  bufferChannel: Channel.Channel<Chunk.Chunk<A>, unknown, E, unknown, void, unknown, R>
)

Source from the content-addressed store, hash-verified

1127}
1128
1129const bufferSignal = <A, E, R>(
1130 scoped: Effect.Effect<Queue.Queue<readonly [Take.Take<A, E>, Deferred.Deferred<void>]>, never, Scope.Scope>,
1131 bufferChannel: Channel.Channel<Chunk.Chunk<A>, unknown, E, unknown, void, unknown, R>
1132): Channel.Channel<Chunk.Chunk<A>, unknown, E, unknown, void, unknown, R> => {
1133 const producer = (
1134 queue: Queue.Queue<readonly [Take.Take<A, E>, Deferred.Deferred<void>]>,
1135 ref: Ref.Ref<Deferred.Deferred<void>>
1136 ): Channel.Channel<never, Chunk.Chunk<A>, never, E, unknown, unknown, R> => {
1137 const terminate = (take: Take.Take<A, E>): Channel.Channel<never, Chunk.Chunk<A>, never, E, unknown, unknown, R> =>
1138 pipe(
1139 Ref.get(ref),
1140 Effect.tap(Deferred.await),
1141 Effect.zipRight(Deferred.make<void>()),
1142 Effect.flatMap((deferred) =>
1143 pipe(
1144 Queue.offer(queue, [take, deferred] as const),
1145 Effect.zipRight(Ref.set(ref, deferred)),
1146 Effect.zipRight(Deferred.await(deferred))
1147 )
1148 ),
1149 Effect.asVoid,
1150 core.fromEffect
1151 )
1152 return core.readWithCause({
1153 onInput: (input: Chunk.Chunk<A>) =>
1154 pipe(
1155 Deferred.make<void>(),
1156 Effect.flatMap(
1157 (deferred) =>
1158 pipe(
1159 Queue.offer(queue, [InternalTake.chunk(input), deferred] as const),
1160 Effect.flatMap((added) => pipe(Ref.set(ref, deferred), Effect.when(() => added)))
1161 )
1162 ),
1163 Effect.asVoid,
1164 core.fromEffect,
1165 core.flatMap(() => producer(queue, ref))
1166 ),
1167 onFailure: (error) => terminate(InternalTake.failCause(error)),
1168 onDone: () => terminate(InternalTake.end)
1169 })
1170 }
1171 const consumer = (
1172 queue: Queue.Queue<readonly [Take.Take<A, E>, Deferred.Deferred<void>]>
1173 ): Channel.Channel<Chunk.Chunk<A>, unknown, E, unknown, void, unknown, R> => {
1174 const process: Channel.Channel<Chunk.Chunk<A>, unknown, E, unknown, void, unknown> = pipe(
1175 core.fromEffect(Queue.take(queue)),
1176 core.flatMap(([take, deferred]) =>
1177 channel.zipRight(
1178 core.fromEffect(Deferred.succeed(deferred, void 0)),
1179 InternalTake.match(take, {
1180 onEnd: () => core.void,
1181 onFailure: core.failCause,
1182 onSuccess: (value) => pipe(core.write(value), core.flatMap(() => process))
1183 })
1184 )
1185 )
1186 )

Callers 1

stream.tsFile · 0.85

Calls 4

producerFunction · 0.85
consumerFunction · 0.85
pipeFunction · 0.70
makeMethod · 0.65

Tested by

no test coverage detected