| 1169 | }) |
| 1170 | } |
| 1171 | const consumer = ( |
| 1172 | queue: Queue.Queue<readonly [Take.Take<A, E>, Deferred.Deferred<void>]> |
| 1173 | ): Channel.Channel<Chunk.Chunk<A>, unknown, E, unknown, void, unknown, R> => { |
| 1174 | const process: Channel.Channel<Chunk.Chunk<A>, unknown, E, unknown, void, unknown> = pipe( |
| 1175 | core.fromEffect(Queue.take(queue)), |
| 1176 | core.flatMap(([take, deferred]) => |
| 1177 | channel.zipRight( |
| 1178 | core.fromEffect(Deferred.succeed(deferred, void 0)), |
| 1179 | InternalTake.match(take, { |
| 1180 | onEnd: () => core.void, |
| 1181 | onFailure: core.failCause, |
| 1182 | onSuccess: (value) => pipe(core.write(value), core.flatMap(() => process)) |
| 1183 | }) |
| 1184 | ) |
| 1185 | ) |
| 1186 | ) |
| 1187 | return process |
| 1188 | } |
| 1189 | return channel.unwrapScoped( |
| 1190 | pipe( |
| 1191 | scoped, |