(
leftover: Option.Option<Chunk.Chunk<A>>,
delimiterIndex: number
)
| 6363 | <A, E, R>(self: Stream.Stream<A, E, R>, delimiter: Chunk.Chunk<A>) => Stream.Stream<Chunk.Chunk<A>, E, R> |
| 6364 | >(2, <A, E, R>(self: Stream.Stream<A, E, R>, delimiter: Chunk.Chunk<A>): Stream.Stream<Chunk.Chunk<A>, E, R> => { |
| 6365 | const next = ( |
| 6366 | leftover: Option.Option<Chunk.Chunk<A>>, |
| 6367 | delimiterIndex: number |
| 6368 | ): Channel.Channel<Chunk.Chunk<Chunk.Chunk<A>>, Chunk.Chunk<A>, E, E, unknown, unknown, R> => |
| 6369 | core.readWithCause({ |
| 6370 | onInput: (inputChunk: Chunk.Chunk<A>) => { |
| 6371 | let buffer: Array<Chunk.Chunk<A>> | undefined |
| 6372 | const [carry, delimiterCursor] = pipe( |
| 6373 | inputChunk, |
| 6374 | Chunk.reduce( |
| 6375 | [pipe(leftover, Option.getOrElse(() => Chunk.empty<A>())), delimiterIndex] as const, |
| 6376 | ([carry, delimiterCursor], a) => { |
| 6377 | const concatenated = pipe(carry, Chunk.append(a)) |
| 6378 | if ( |
| 6379 | delimiterCursor < delimiter.length && |
| 6380 | Equal.equals(a, pipe(delimiter, Chunk.unsafeGet(delimiterCursor))) |
| 6381 | ) { |
| 6382 | if (delimiterCursor + 1 === delimiter.length) { |
| 6383 | if (buffer === undefined) { |
| 6384 | buffer = [] |
| 6385 | } |
| 6386 | buffer.push(pipe(concatenated, Chunk.take(concatenated.length - delimiter.length))) |
| 6387 | return [Chunk.empty<A>(), 0] as const |
| 6388 | } |
| 6389 | return [concatenated, delimiterCursor + 1] as const |
| 6390 | } |
| 6391 | return [concatenated, Equal.equals(a, pipe(delimiter, Chunk.unsafeGet(0))) ? 1 : 0] as const |
| 6392 | } |
| 6393 | ) |
| 6394 | ) |
| 6395 | const output = buffer === undefined ? Chunk.empty<Chunk.Chunk<A>>() : Chunk.unsafeFromArray(buffer) |
| 6396 | return core.flatMap( |
| 6397 | core.write(output), |
| 6398 | () => next(Chunk.isNonEmpty(carry) ? Option.some(carry) : Option.none(), delimiterCursor) |
| 6399 | ) |
| 6400 | }, |
| 6401 | onFailure: (cause) => |
| 6402 | Option.match(leftover, { |
| 6403 | onNone: () => core.failCause(cause), |
| 6404 | onSome: (chunk) => |
| 6405 | channel.zipRight( |
| 6406 | core.write(Chunk.of(chunk)), |
| 6407 | core.failCause(cause) |
| 6408 | ) |
| 6409 | }), |
| 6410 | onDone: (done) => |
| 6411 | Option.match(leftover, { |
| 6412 | onNone: () => core.succeed(done), |
| 6413 | onSome: (chunk) => channel.zipRight(core.write(Chunk.of(chunk)), core.succeed(done)) |
| 6414 | }) |
| 6415 | }) |
| 6416 | return new StreamImpl(pipe(toChannel(self), core.pipeTo(next(Option.none(), 0)))) |
| 6417 | }) |
| 6418 |
no test coverage detected