| 1108 | }) |
| 1109 | |
| 1110 | const bufferUnbounded = <A, E, R>(self: Stream.Stream<A, E, R>): Stream.Stream<A, E, R> => { |
| 1111 | const queue = toQueue(self, { strategy: "unbounded" }) |
| 1112 | return new StreamImpl( |
| 1113 | channel.unwrapScoped( |
| 1114 | Effect.map(queue, (queue) => { |
| 1115 | const process: Channel.Channel<Chunk.Chunk<A>, unknown, E, unknown, void, unknown> = pipe( |
| 1116 | core.fromEffect(Queue.take(queue)), |
| 1117 | core.flatMap(InternalTake.match({ |
| 1118 | onEnd: () => core.void, |
| 1119 | onFailure: core.failCause, |
| 1120 | onSuccess: (value) => core.flatMap(core.write(value), () => process) |
| 1121 | })) |
| 1122 | ) |
| 1123 | return process |
| 1124 | }) |
| 1125 | ) |
| 1126 | ) |
| 1127 | } |
| 1128 | |
| 1129 | const bufferSignal = <A, E, R>( |
| 1130 | scoped: Effect.Effect<Queue.Queue<readonly [Take.Take<A, E>, Deferred.Deferred<void>]>, never, Scope.Scope>, |