| 288 | const timeout = (lastB: Option.Option<B>): Effect.Effect<C, Option.Option<never>, R2 | R3> => |
| 289 | scheduleDriver.next(lastB) |
| 290 | const scheduledAggregator = ( |
| 291 | sinkFiber: Fiber.RuntimeFiber<readonly [Chunk.Chunk<Chunk.Chunk<A | A2>>, B], E | E2>, |
| 292 | scheduleFiber: Fiber.RuntimeFiber<C, Option.Option<never>>, |
| 293 | scope: Scope.Scope |
| 294 | ): Channel.Channel<Chunk.Chunk<Either.Either<B, C>>, unknown, E | E2, unknown, unknown, unknown, R2 | R3> => { |
| 295 | const forkSink = pipe( |
| 296 | Ref.set(consumed, false), |
| 297 | Effect.zipRight(Ref.set(endAfterEmit, false)), |
| 298 | Effect.zipRight( |
| 299 | pipe( |
| 300 | handoffConsumer, |
| 301 | channel.pipeToOrFail(sink_.toChannel(sink)), |
| 302 | core.collectElements, |
| 303 | channel.run, |
| 304 | Effect.forkIn(scope) |
| 305 | ) |
| 306 | ) |
| 307 | ) |
| 308 | const handleSide = ( |
| 309 | leftovers: Chunk.Chunk<Chunk.Chunk<A | A2>>, |
| 310 | b: B, |
| 311 | c: Option.Option<C> |
| 312 | ): Channel.Channel<Chunk.Chunk<Either.Either<B, C>>, unknown, E | E2, unknown, unknown, unknown, R2 | R3> => |
| 313 | pipe( |
| 314 | Ref.set(sinkLeftovers, Chunk.flatten(leftovers)), |
| 315 | Effect.zipRight( |
| 316 | Effect.map(Ref.get(sinkEndReason), (reason) => { |
| 317 | switch (reason._tag) { |
| 318 | case SinkEndReason.OP_SCHEDULE_END: { |
| 319 | return pipe( |
| 320 | Effect.all([ |
| 321 | Ref.get(consumed), |
| 322 | forkSink, |
| 323 | pipe(timeout(Option.some(b)), Effect.forkIn(scope)) |
| 324 | ]), |
| 325 | Effect.map(([wasConsumed, sinkFiber, scheduleFiber]) => { |
| 326 | const toWrite = pipe( |
| 327 | c, |
| 328 | Option.match({ |
| 329 | onNone: (): Chunk.Chunk<Either.Either<B, C>> => Chunk.of(Either.right(b)), |
| 330 | onSome: (c): Chunk.Chunk<Either.Either<B, C>> => |
| 331 | Chunk.make(Either.right(b), Either.left(c)) |
| 332 | }) |
| 333 | ) |
| 334 | if (wasConsumed) { |
| 335 | return pipe( |
| 336 | core.write(toWrite), |
| 337 | core.flatMap(() => scheduledAggregator(sinkFiber, scheduleFiber, scope)) |
| 338 | ) |
| 339 | } |
| 340 | return scheduledAggregator(sinkFiber, scheduleFiber, scope) |
| 341 | }), |
| 342 | channel.unwrap |
| 343 | ) |
| 344 | } |
| 345 | case SinkEndReason.OP_UPSTREAM_END: { |
| 346 | return pipe( |
| 347 | Ref.get(consumed), |