| 1383 | <A, E, R>(self: Stream.Stream<A, E, R>, f: (x: A, y: A) => boolean) => Stream.Stream<A, E, R> |
| 1384 | >(2, <A, E, R>(self: Stream.Stream<A, E, R>, f: (x: A, y: A) => boolean): Stream.Stream<A, E, R> => { |
| 1385 | const writer = ( |
| 1386 | last: Option.Option<A> |
| 1387 | ): Channel.Channel<Chunk.Chunk<A>, Chunk.Chunk<A>, E, E, void, unknown> => |
| 1388 | core.readWithCause({ |
| 1389 | onInput: (input: Chunk.Chunk<A>) => { |
| 1390 | const [newLast, newChunk] = Chunk.reduce( |
| 1391 | input, |
| 1392 | [last, Chunk.empty<A>()] as const, |
| 1393 | ([option, outputs], output) => { |
| 1394 | if (Option.isSome(option) && f(option.value, output)) { |
| 1395 | return [Option.some(output), outputs] as const |
| 1396 | } |
| 1397 | return [Option.some(output), pipe(outputs, Chunk.append(output))] as const |
| 1398 | } |
| 1399 | ) |
| 1400 | if (Chunk.isEmpty(newChunk)) return writer(newLast) |
| 1401 | return core.flatMap( |
| 1402 | core.write(newChunk), |
| 1403 | () => writer(newLast) |
| 1404 | ) |
| 1405 | }, |
| 1406 | onFailure: core.failCause, |
| 1407 | onDone: () => core.void |
| 1408 | }) |
| 1409 | return new StreamImpl(pipe(toChannel(self), core.pipeTo(writer(Option.none())))) |
| 1410 | }) |
| 1411 | |