| 1135 | ref: Ref.Ref<Deferred.Deferred<void>> |
| 1136 | ): Channel.Channel<never, Chunk.Chunk<A>, never, E, unknown, unknown, R> => { |
| 1137 | const terminate = (take: Take.Take<A, E>): Channel.Channel<never, Chunk.Chunk<A>, never, E, unknown, unknown, R> => |
| 1138 | pipe( |
| 1139 | Ref.get(ref), |
| 1140 | Effect.tap(Deferred.await), |
| 1141 | Effect.zipRight(Deferred.make<void>()), |
| 1142 | Effect.flatMap((deferred) => |
| 1143 | pipe( |
| 1144 | Queue.offer(queue, [take, deferred] as const), |
| 1145 | Effect.zipRight(Ref.set(ref, deferred)), |
| 1146 | Effect.zipRight(Deferred.await(deferred)) |
| 1147 | ) |
| 1148 | ), |
| 1149 | Effect.asVoid, |
| 1150 | core.fromEffect |
| 1151 | ) |
| 1152 | return core.readWithCause({ |
| 1153 | onInput: (input: Chunk.Chunk<A>) => |
| 1154 | pipe( |