| 308 | ) |
| 309 | |
| 310 | const collectAllWhileWithLoop = <Z, In, L extends In, E, R, S>( |
| 311 | self: Sink.Sink<Z, In, L, E, R>, |
| 312 | leftoversRef: Ref.Ref<Chunk.Chunk<In>>, |
| 313 | upstreamDoneRef: Ref.Ref<boolean>, |
| 314 | currentResult: S, |
| 315 | p: Predicate<Z>, |
| 316 | f: (s: S, z: Z) => S |
| 317 | ): Channel.Channel<Chunk.Chunk<L>, Chunk.Chunk<In>, E, never, S, unknown, R> => { |
| 318 | return pipe( |
| 319 | toChannel(self), |
| 320 | channel.doneCollect, |
| 321 | channel.foldChannel({ |
| 322 | onFailure: core.fail, |
| 323 | onSuccess: ([leftovers, doneValue]) => |
| 324 | p(doneValue) |
| 325 | ? pipe( |
| 326 | core.fromEffect( |
| 327 | Ref.set(leftoversRef, Chunk.flatten(leftovers as Chunk.Chunk<Chunk.Chunk<In>>)) |
| 328 | ), |
| 329 | core.flatMap(() => |
| 330 | pipe( |
| 331 | core.fromEffect(Ref.get(upstreamDoneRef)), |
| 332 | core.flatMap((upstreamDone) => { |
| 333 | const accumulatedResult = f(currentResult, doneValue) |
| 334 | return upstreamDone |
| 335 | ? pipe(core.write(Chunk.flatten(leftovers)), channel.as(accumulatedResult)) |
| 336 | : collectAllWhileWithLoop(self, leftoversRef, upstreamDoneRef, accumulatedResult, p, f) |
| 337 | }) |
| 338 | ) |
| 339 | ) |
| 340 | ) |
| 341 | : pipe(core.write(Chunk.flatten(leftovers)), channel.as(currentResult)) |
| 342 | }) |
| 343 | ) |
| 344 | } |
| 345 | |
| 346 | /** @internal */ |
| 347 | export const collectLeftover = <A, In, L, E, R>( |