MCPcopy
hub / github.com/Effect-TS/effect / writer

Function writer

packages/effect/src/internal/stream.ts:1385–1408  ·  view source on GitHub ↗
(
    last: Option.Option<A>
  )

Source from the content-addressed store, hash-verified

1383 <A, E, R>(self: Stream.Stream<A, E, R>, f: (x: A, y: A) => boolean) => Stream.Stream<A, E, R>
1384>(2, <A, E, R>(self: Stream.Stream<A, E, R>, f: (x: A, y: A) => boolean): Stream.Stream<A, E, R> => {
1385 const writer = (
1386 last: Option.Option<A>
1387 ): Channel.Channel<Chunk.Chunk<A>, Chunk.Chunk<A>, E, E, void, unknown> =>
1388 core.readWithCause({
1389 onInput: (input: Chunk.Chunk<A>) => {
1390 const [newLast, newChunk] = Chunk.reduce(
1391 input,
1392 [last, Chunk.empty<A>()] as const,
1393 ([option, outputs], output) => {
1394 if (Option.isSome(option) && f(option.value, output)) {
1395 return [Option.some(output), outputs] as const
1396 }
1397 return [Option.some(output), pipe(outputs, Chunk.append(output))] as const
1398 }
1399 )
1400 if (Chunk.isEmpty(newChunk)) return writer(newLast)
1401 return core.flatMap(
1402 core.write(newChunk),
1403 () => writer(newLast)
1404 )
1405 },
1406 onFailure: core.failCause,
1407 onDone: () => core.void
1408 })
1409 return new StreamImpl(pipe(toChannel(self), core.pipeTo(writer(Option.none()))))
1410})
1411

Callers 1

stream.tsFile · 0.85

Calls 6

pipeFunction · 0.70
isEmptyMethod · 0.65
writeMethod · 0.65
mapMethod · 0.65
pollMethod · 0.65
fFunction · 0.50

Tested by

no test coverage detected