MCPcopy Index your code
hub / github.com/Effect-TS/effect / fromQueue

Function fromQueue

packages/effect/src/internal/stream.ts:3186–3207  ·  view source on GitHub ↗
(
  queue: Queue.Dequeue<A>,
  options?: {
    readonly maxChunkSize?: number | undefined
    readonly shutdown?: boolean | undefined
  }
)

Source from the content-addressed store, hash-verified

3184
3185/** @internal */
3186export const fromQueue = <A>(
3187 queue: Queue.Dequeue<A>,
3188 options?: {
3189 readonly maxChunkSize?: number | undefined
3190 readonly shutdown?: boolean | undefined
3191 }
3192): Stream.Stream<A> =>
3193 pipe(
3194 Queue.takeBetween(queue, 1, options?.maxChunkSize ?? DefaultChunkSize),
3195 Effect.catchAllCause((cause) =>
3196 pipe(
3197 Queue.isShutdown(queue),
3198 Effect.flatMap((isShutdown) =>
3199 isShutdown && Cause.isInterrupted(cause) ?
3200 pull.end() :
3201 pull.failCause(cause)
3202 )
3203 )
3204 ),
3205 repeatEffectChunkOption,
3206 options?.shutdown ? ensuring(Queue.shutdown(queue)) : identity
3207 )
3208
3209/** @internal */
3210export const fromTQueue = <A>(queue: TQueue.TDequeue<A>): Stream.Stream<A> =>

Callers 2

stream.tsFile · 0.70
fromPubSubFunction · 0.70

Calls 7

isInterruptedMethod · 0.80
failCauseMethod · 0.80
pipeFunction · 0.70
takeBetweenMethod · 0.65
endMethod · 0.65
isShutdownMethod · 0.45
shutdownMethod · 0.45

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…