MCPcopy Index your code
hub / github.com/Effect-TS/effect / scheduledAggregator

Function scheduledAggregator

packages/effect/src/internal/stream.ts:290–424  ·  view source on GitHub ↗
(
          sinkFiber: Fiber.RuntimeFiber<readonly [Chunk.Chunk<Chunk.Chunk<A | A2>>, B], E | E2>,
          scheduleFiber: Fiber.RuntimeFiber<C, Option.Option<never>>,
          scope: Scope.Scope
        )

Source from the content-addressed store, hash-verified

288 const timeout = (lastB: Option.Option<B>): Effect.Effect<C, Option.Option<never>, R2 | R3> =>
289 scheduleDriver.next(lastB)
290 const scheduledAggregator = (
291 sinkFiber: Fiber.RuntimeFiber<readonly [Chunk.Chunk<Chunk.Chunk<A | A2>>, B], E | E2>,
292 scheduleFiber: Fiber.RuntimeFiber<C, Option.Option<never>>,
293 scope: Scope.Scope
294 ): Channel.Channel<Chunk.Chunk<Either.Either<B, C>>, unknown, E | E2, unknown, unknown, unknown, R2 | R3> => {
295 const forkSink = pipe(
296 Ref.set(consumed, false),
297 Effect.zipRight(Ref.set(endAfterEmit, false)),
298 Effect.zipRight(
299 pipe(
300 handoffConsumer,
301 channel.pipeToOrFail(sink_.toChannel(sink)),
302 core.collectElements,
303 channel.run,
304 Effect.forkIn(scope)
305 )
306 )
307 )
308 const handleSide = (
309 leftovers: Chunk.Chunk<Chunk.Chunk<A | A2>>,
310 b: B,
311 c: Option.Option<C>
312 ): Channel.Channel<Chunk.Chunk<Either.Either<B, C>>, unknown, E | E2, unknown, unknown, unknown, R2 | R3> =>
313 pipe(
314 Ref.set(sinkLeftovers, Chunk.flatten(leftovers)),
315 Effect.zipRight(
316 Effect.map(Ref.get(sinkEndReason), (reason) => {
317 switch (reason._tag) {
318 case SinkEndReason.OP_SCHEDULE_END: {
319 return pipe(
320 Effect.all([
321 Ref.get(consumed),
322 forkSink,
323 pipe(timeout(Option.some(b)), Effect.forkIn(scope))
324 ]),
325 Effect.map(([wasConsumed, sinkFiber, scheduleFiber]) => {
326 const toWrite = pipe(
327 c,
328 Option.match({
329 onNone: (): Chunk.Chunk<Either.Either<B, C>> => Chunk.of(Either.right(b)),
330 onSome: (c): Chunk.Chunk<Either.Either<B, C>> =>
331 Chunk.make(Either.right(b), Either.left(c))
332 })
333 )
334 if (wasConsumed) {
335 return pipe(
336 core.write(toWrite),
337 core.flatMap(() => scheduledAggregator(sinkFiber, scheduleFiber, scope))
338 )
339 }
340 return scheduledAggregator(sinkFiber, scheduleFiber, scope)
341 }),
342 channel.unwrap
343 )
344 }
345 case SinkEndReason.OP_UPSTREAM_END: {
346 return pipe(
347 Ref.get(consumed),

Callers 2

handleSideFunction · 0.85
stream.tsFile · 0.85

Calls 9

pipeFunction · 0.70
handleSideFunction · 0.70
setMethod · 0.65
joinMethod · 0.65
mapMethod · 0.65
offerMethod · 0.65
endMethod · 0.65
haltMethod · 0.65
interruptMethod · 0.45

Tested by

no test coverage detected