MCPcopy
hub / github.com/Effect-TS/effect / asyncPauseResume

Function asyncPauseResume

packages/sql/src/SqlStream.ts:15–78  ·  view source on GitHub ↗
(
  register: (emit: {
    readonly single: (item: A) => void
    readonly chunk: (chunk: Chunk.Chunk<A>) => void
    readonly array: (chunk: ReadonlyArray<A>) => void
    readonly fail: (error: E) => void
    readonly end: () => void
  }) => {
    readonly onInterrupt: Effect.Effect<void, never, R>
    readonly onPause: Effect.Effect<void>
    readonly onResume: Effect.Effect<void>
  },
  bufferSize = 2
)

Source from the content-addressed store, hash-verified

13 * @since 1.0.0
14 */
15export const asyncPauseResume = <A, E = never, R = never>(
16 register: (emit: {
17 readonly single: (item: A) => void
18 readonly chunk: (chunk: Chunk.Chunk<A>) => void
19 readonly array: (chunk: ReadonlyArray<A>) => void
20 readonly fail: (error: E) => void
21 readonly end: () => void
22 }) => {
23 readonly onInterrupt: Effect.Effect<void, never, R>
24 readonly onPause: Effect.Effect<void>
25 readonly onResume: Effect.Effect<void>
26 },
27 bufferSize = 2
28): Stream.Stream<A, E, R> => {
29 const EOF = Symbol()
30 return Effect.all([
31 Queue.bounded<Chunk.Chunk<A> | typeof EOF>(bufferSize),
32 Deferred.make<never, Option.Option<E>>(),
33 Effect.runtime<never>()
34 ]).pipe(
35 Effect.flatMap(([queue, deferred, runtime]) => {
36 return Effect.async<never, Option.Option<E>, R>((cb) => {
37 const runFork = Runtime.runFork(runtime)
38
39 // eslint-disable-next-line prefer-const
40 let effects: {
41 readonly onInterrupt: Effect.Effect<void, never, R>
42 readonly onPause: Effect.Effect<void>
43 readonly onResume: Effect.Effect<void>
44 }
45
46 const offer = (chunk: Chunk.Chunk<A>) =>
47 Queue.isFull(queue).pipe(
48 Effect.tap((full) => (full ? effects.onPause : Effect.void)),
49 Effect.zipRight(Queue.offer(queue, chunk)),
50 Effect.zipRight(effects.onResume)
51 )
52
53 effects = register({
54 single: (item) => runFork(offer(Chunk.of(item))),
55 chunk: (chunk) => runFork(offer(chunk)),
56 array: (chunk) => runFork(offer(Chunk.unsafeFromArray(chunk))),
57 fail: (error) => cb(Effect.fail(Option.some(error))),
58 end: () => cb(Effect.fail(Option.none()))
59 })
60
61 return effects.onInterrupt
62 }).pipe(
63 Effect.ensuring(Queue.offer(queue, EOF)),
64 Effect.intoDeferred(deferred),
65 Effect.forkScoped,
66 Effect.as(
67 Stream.repeatEffectChunkOption(
68 Effect.flatMap(
69 Queue.take(queue),
70 (chunk) => chunk === EOF ? Deferred.await(deferred) : Effect.succeed(chunk)
71 )
72 )

Callers 1

queryStreamFunction · 0.90

Calls 11

runtimeMethod · 0.80
offerFunction · 0.70
pipeMethod · 0.65
makeMethod · 0.65
ofMethod · 0.65
failMethod · 0.65
offerMethod · 0.65
takeMethod · 0.65
registerFunction · 0.50
runForkFunction · 0.50
awaitMethod · 0.45

Tested by

no test coverage detected