MCPcopy Index your code
hub / github.com/Effect-TS/effect / fromTQueue

Function fromTQueue

packages/effect/src/internal/stream.ts:3210–3225  ·  view source on GitHub ↗
(queue: TQueue.TDequeue<A>)

Source from the content-addressed store, hash-verified

3208
3209/** @internal */
3210export const fromTQueue = <A>(queue: TQueue.TDequeue<A>): Stream.Stream<A> =>
3211 pipe(
3212 TQueue.take(queue),
3213 Effect.map(Chunk.of),
3214 Effect.catchAllCause((cause) =>
3215 pipe(
3216 TQueue.isShutdown(queue),
3217 Effect.flatMap((isShutdown) =>
3218 isShutdown && Cause.isInterrupted(cause) ?
3219 pull.end() :
3220 pull.failCause(cause)
3221 )
3222 )
3223 ),
3224 repeatEffectChunkOption
3225 )
3226
3227/** @internal */
3228export const fromSchedule = <A, R>(schedule: Schedule.Schedule<A, unknown, R>): Stream.Stream<A, never, R> =>

Callers 1

fromTPubSubFunction · 0.85

Calls 7

isInterruptedMethod · 0.80
failCauseMethod · 0.80
pipeFunction · 0.70
takeMethod · 0.65
mapMethod · 0.65
endMethod · 0.65
isShutdownMethod · 0.45

Tested by

no test coverage detected