MCPcopy Index your code
hub / github.com/Effect-TS/effect / QueueImpl

Class QueueImpl

packages/effect/src/internal/queue.ts:67–304  ·  view source on GitHub ↗

@internal

Source from the content-addressed store, hash-verified

65
66/** @internal */
67class QueueImpl<in out A> extends Effectable.Class<A> implements Queue.Queue<A> {
68 readonly [EnqueueTypeId] = enqueueVariance
69 readonly [DequeueTypeId] = dequeueVariance
70
71 constructor(
72 /** @internal */
73 readonly queue: Queue.BackingQueue<A>,
74 /** @internal */
75 readonly takers: MutableQueue.MutableQueue<Deferred.Deferred<A>>,
76 /** @internal */
77 readonly shutdownHook: Deferred.Deferred<void>,
78 /** @internal */
79 readonly shutdownFlag: MutableRef.MutableRef<boolean>,
80 /** @internal */
81 readonly strategy: Queue.Strategy<A>
82 ) {
83 super()
84 }
85
86 pipe() {
87 return pipeArguments(this, arguments)
88 }
89
90 commit() {
91 return this.take
92 }
93
94 capacity(): number {
95 return this.queue.capacity()
96 }
97
98 get size(): Effect.Effect<number> {
99 return core.suspend(() => core.catchAll(this.unsafeSize(), () => core.interrupt))
100 }
101
102 unsafeSize() {
103 if (MutableRef.get(this.shutdownFlag)) {
104 return Option.none<number>()
105 }
106 return Option.some(
107 this.queue.length() -
108 MutableQueue.length(this.takers) +
109 this.strategy.surplusSize()
110 )
111 }
112
113 get isEmpty(): Effect.Effect<boolean> {
114 return core.map(this.size, (size) => size <= 0)
115 }
116
117 get isFull(): Effect.Effect<boolean> {
118 return core.map(this.size, (size) => size >= this.capacity())
119 }
120
121 get shutdown(): Effect.Effect<void> {
122 return core.uninterruptible(
123 core.withFiberRuntime((state) => {
124 pipe(this.shutdownFlag, MutableRef.set(true))

Callers

nothing calls this directly

Calls

no outgoing calls

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…