MCPcopy Index your code
hub / github.com/Effect-TS/effect / next

Function next

packages/effect/src/internal/stream.ts:6365–6415  ·  view source on GitHub ↗
(
    leftover: Option.Option<Chunk.Chunk<A>>,
    delimiterIndex: number
  )

Source from the content-addressed store, hash-verified

6363 <A, E, R>(self: Stream.Stream<A, E, R>, delimiter: Chunk.Chunk<A>) => Stream.Stream<Chunk.Chunk<A>, E, R>
6364>(2, <A, E, R>(self: Stream.Stream<A, E, R>, delimiter: Chunk.Chunk<A>): Stream.Stream<Chunk.Chunk<A>, E, R> => {
6365 const next = (
6366 leftover: Option.Option<Chunk.Chunk<A>>,
6367 delimiterIndex: number
6368 ): Channel.Channel<Chunk.Chunk<Chunk.Chunk<A>>, Chunk.Chunk<A>, E, E, unknown, unknown, R> =>
6369 core.readWithCause({
6370 onInput: (inputChunk: Chunk.Chunk<A>) => {
6371 let buffer: Array<Chunk.Chunk<A>> | undefined
6372 const [carry, delimiterCursor] = pipe(
6373 inputChunk,
6374 Chunk.reduce(
6375 [pipe(leftover, Option.getOrElse(() => Chunk.empty<A>())), delimiterIndex] as const,
6376 ([carry, delimiterCursor], a) => {
6377 const concatenated = pipe(carry, Chunk.append(a))
6378 if (
6379 delimiterCursor < delimiter.length &&
6380 Equal.equals(a, pipe(delimiter, Chunk.unsafeGet(delimiterCursor)))
6381 ) {
6382 if (delimiterCursor + 1 === delimiter.length) {
6383 if (buffer === undefined) {
6384 buffer = []
6385 }
6386 buffer.push(pipe(concatenated, Chunk.take(concatenated.length - delimiter.length)))
6387 return [Chunk.empty<A>(), 0] as const
6388 }
6389 return [concatenated, delimiterCursor + 1] as const
6390 }
6391 return [concatenated, Equal.equals(a, pipe(delimiter, Chunk.unsafeGet(0))) ? 1 : 0] as const
6392 }
6393 )
6394 )
6395 const output = buffer === undefined ? Chunk.empty<Chunk.Chunk<A>>() : Chunk.unsafeFromArray(buffer)
6396 return core.flatMap(
6397 core.write(output),
6398 () => next(Chunk.isNonEmpty(carry) ? Option.some(carry) : Option.none(), delimiterCursor)
6399 )
6400 },
6401 onFailure: (cause) =>
6402 Option.match(leftover, {
6403 onNone: () => core.failCause(cause),
6404 onSome: (chunk) =>
6405 channel.zipRight(
6406 core.write(Chunk.of(chunk)),
6407 core.failCause(cause)
6408 )
6409 }),
6410 onDone: (done) =>
6411 Option.match(leftover, {
6412 onNone: () => core.succeed(done),
6413 onSome: (chunk) => channel.zipRight(core.write(Chunk.of(chunk)), core.succeed(done))
6414 })
6415 })
6416 return new StreamImpl(pipe(toChannel(self), core.pipeTo(next(Option.none(), 0))))
6417})
6418

Callers 2

iterateFunction · 0.70
stream.tsFile · 0.70

Calls 11

unsafeGetMethod · 0.80
failCauseMethod · 0.80
syncMethod · 0.80
unsafeCloseMethod · 0.80
unsafeOpenMethod · 0.80
pipeFunction · 0.70
runForkFunction · 0.70
takeMethod · 0.65
writeMethod · 0.65
ofMethod · 0.65
addObserverMethod · 0.65

Tested by

no test coverage detected