MCPcopy
hub / github.com/Effect-TS/effect / collectAllWhileWithLoop

Function collectAllWhileWithLoop

packages/effect/src/internal/sink.ts:310–344  ·  view source on GitHub ↗
(
  self: Sink.Sink<Z, In, L, E, R>,
  leftoversRef: Ref.Ref<Chunk.Chunk<In>>,
  upstreamDoneRef: Ref.Ref<boolean>,
  currentResult: S,
  p: Predicate<Z>,
  f: (s: S, z: Z) => S
)

Source from the content-addressed store, hash-verified

308)
309
310const collectAllWhileWithLoop = <Z, In, L extends In, E, R, S>(
311 self: Sink.Sink<Z, In, L, E, R>,
312 leftoversRef: Ref.Ref<Chunk.Chunk<In>>,
313 upstreamDoneRef: Ref.Ref<boolean>,
314 currentResult: S,
315 p: Predicate<Z>,
316 f: (s: S, z: Z) => S
317): Channel.Channel<Chunk.Chunk<L>, Chunk.Chunk<In>, E, never, S, unknown, R> => {
318 return pipe(
319 toChannel(self),
320 channel.doneCollect,
321 channel.foldChannel({
322 onFailure: core.fail,
323 onSuccess: ([leftovers, doneValue]) =>
324 p(doneValue)
325 ? pipe(
326 core.fromEffect(
327 Ref.set(leftoversRef, Chunk.flatten(leftovers as Chunk.Chunk<Chunk.Chunk<In>>))
328 ),
329 core.flatMap(() =>
330 pipe(
331 core.fromEffect(Ref.get(upstreamDoneRef)),
332 core.flatMap((upstreamDone) => {
333 const accumulatedResult = f(currentResult, doneValue)
334 return upstreamDone
335 ? pipe(core.write(Chunk.flatten(leftovers)), channel.as(accumulatedResult))
336 : collectAllWhileWithLoop(self, leftoversRef, upstreamDoneRef, accumulatedResult, p, f)
337 })
338 )
339 )
340 )
341 : pipe(core.write(Chunk.flatten(leftovers)), channel.as(currentResult))
342 })
343 )
344}
345
346/** @internal */
347export const collectLeftover = <A, In, L, E, R>(

Callers 1

sink.tsFile · 0.85

Calls 8

fromEffectMethod · 0.80
pipeFunction · 0.70
toChannelFunction · 0.70
setMethod · 0.65
getMethod · 0.65
writeMethod · 0.65
pFunction · 0.50
fFunction · 0.50

Tested by

no test coverage detected