| 13 | * @since 1.0.0 |
| 14 | */ |
| 15 | export const asyncPauseResume = <A, E = never, R = never>( |
| 16 | register: (emit: { |
| 17 | readonly single: (item: A) => void |
| 18 | readonly chunk: (chunk: Chunk.Chunk<A>) => void |
| 19 | readonly array: (chunk: ReadonlyArray<A>) => void |
| 20 | readonly fail: (error: E) => void |
| 21 | readonly end: () => void |
| 22 | }) => { |
| 23 | readonly onInterrupt: Effect.Effect<void, never, R> |
| 24 | readonly onPause: Effect.Effect<void> |
| 25 | readonly onResume: Effect.Effect<void> |
| 26 | }, |
| 27 | bufferSize = 2 |
| 28 | ): Stream.Stream<A, E, R> => { |
| 29 | const EOF = Symbol() |
| 30 | return Effect.all([ |
| 31 | Queue.bounded<Chunk.Chunk<A> | typeof EOF>(bufferSize), |
| 32 | Deferred.make<never, Option.Option<E>>(), |
| 33 | Effect.runtime<never>() |
| 34 | ]).pipe( |
| 35 | Effect.flatMap(([queue, deferred, runtime]) => { |
| 36 | return Effect.async<never, Option.Option<E>, R>((cb) => { |
| 37 | const runFork = Runtime.runFork(runtime) |
| 38 | |
| 39 | // eslint-disable-next-line prefer-const |
| 40 | let effects: { |
| 41 | readonly onInterrupt: Effect.Effect<void, never, R> |
| 42 | readonly onPause: Effect.Effect<void> |
| 43 | readonly onResume: Effect.Effect<void> |
| 44 | } |
| 45 | |
| 46 | const offer = (chunk: Chunk.Chunk<A>) => |
| 47 | Queue.isFull(queue).pipe( |
| 48 | Effect.tap((full) => (full ? effects.onPause : Effect.void)), |
| 49 | Effect.zipRight(Queue.offer(queue, chunk)), |
| 50 | Effect.zipRight(effects.onResume) |
| 51 | ) |
| 52 | |
| 53 | effects = register({ |
| 54 | single: (item) => runFork(offer(Chunk.of(item))), |
| 55 | chunk: (chunk) => runFork(offer(chunk)), |
| 56 | array: (chunk) => runFork(offer(Chunk.unsafeFromArray(chunk))), |
| 57 | fail: (error) => cb(Effect.fail(Option.some(error))), |
| 58 | end: () => cb(Effect.fail(Option.none())) |
| 59 | }) |
| 60 | |
| 61 | return effects.onInterrupt |
| 62 | }).pipe( |
| 63 | Effect.ensuring(Queue.offer(queue, EOF)), |
| 64 | Effect.intoDeferred(deferred), |
| 65 | Effect.forkScoped, |
| 66 | Effect.as( |
| 67 | Stream.repeatEffectChunkOption( |
| 68 | Effect.flatMap( |
| 69 | Queue.take(queue), |
| 70 | (chunk) => chunk === EOF ? Deferred.await(deferred) : Effect.succeed(chunk) |
| 71 | ) |
| 72 | ) |