| 1131 | bufferChannel: Channel.Channel<Chunk.Chunk<A>, unknown, E, unknown, void, unknown, R> |
| 1132 | ): Channel.Channel<Chunk.Chunk<A>, unknown, E, unknown, void, unknown, R> => { |
| 1133 | const producer = ( |
| 1134 | queue: Queue.Queue<readonly [Take.Take<A, E>, Deferred.Deferred<void>]>, |
| 1135 | ref: Ref.Ref<Deferred.Deferred<void>> |
| 1136 | ): Channel.Channel<never, Chunk.Chunk<A>, never, E, unknown, unknown, R> => { |
| 1137 | const terminate = (take: Take.Take<A, E>): Channel.Channel<never, Chunk.Chunk<A>, never, E, unknown, unknown, R> => |
| 1138 | pipe( |
| 1139 | Ref.get(ref), |
| 1140 | Effect.tap(Deferred.await), |
| 1141 | Effect.zipRight(Deferred.make<void>()), |
| 1142 | Effect.flatMap((deferred) => |
| 1143 | pipe( |
| 1144 | Queue.offer(queue, [take, deferred] as const), |
| 1145 | Effect.zipRight(Ref.set(ref, deferred)), |
| 1146 | Effect.zipRight(Deferred.await(deferred)) |
| 1147 | ) |
| 1148 | ), |
| 1149 | Effect.asVoid, |
| 1150 | core.fromEffect |
| 1151 | ) |
| 1152 | return core.readWithCause({ |
| 1153 | onInput: (input: Chunk.Chunk<A>) => |
| 1154 | pipe( |
| 1155 | Deferred.make<void>(), |
| 1156 | Effect.flatMap( |
| 1157 | (deferred) => |
| 1158 | pipe( |
| 1159 | Queue.offer(queue, [InternalTake.chunk(input), deferred] as const), |
| 1160 | Effect.flatMap((added) => pipe(Ref.set(ref, deferred), Effect.when(() => added))) |
| 1161 | ) |
| 1162 | ), |
| 1163 | Effect.asVoid, |
| 1164 | core.fromEffect, |
| 1165 | core.flatMap(() => producer(queue, ref)) |
| 1166 | ), |
| 1167 | onFailure: (error) => terminate(InternalTake.failCause(error)), |
| 1168 | onDone: () => terminate(InternalTake.end) |
| 1169 | }) |
| 1170 | } |
| 1171 | const consumer = ( |
| 1172 | queue: Queue.Queue<readonly [Take.Take<A, E>, Deferred.Deferred<void>]> |
| 1173 | ): Channel.Channel<Chunk.Chunk<A>, unknown, E, unknown, void, unknown, R> => { |