MCPcopy
hub / github.com/Effect-TS/effect / QueueImpl

Class QueueImpl

packages/effect/src/internal/queue.ts:67–304  ·  view source on GitHub ↗

@internal

Source from the content-addressed store, hash-verified

65
66/** @internal */
67class QueueImpl<in out A> extends Effectable.Class<A> implements Queue.Queue<A> {
68 readonly [EnqueueTypeId] = enqueueVariance
69 readonly [DequeueTypeId] = dequeueVariance
70
71 constructor(
72 /** @internal */
73 readonly queue: Queue.BackingQueue<A>,
74 /** @internal */
75 readonly takers: MutableQueue.MutableQueue<Deferred.Deferred<A>>,
76 /** @internal */
77 readonly shutdownHook: Deferred.Deferred<void>,
78 /** @internal */
79 readonly shutdownFlag: MutableRef.MutableRef<boolean>,
80 /** @internal */
81 readonly strategy: Queue.Strategy<A>
82 ) {
83 super()
84 }
85
86 pipe() {
87 return pipeArguments(this, arguments)
88 }
89
90 commit() {
91 return this.take
92 }
93
94 capacity(): number {
95 return this.queue.capacity()
96 }
97
98 get size(): Effect.Effect<number> {
99 return core.suspend(() => core.catchAll(this.unsafeSize(), () => core.interrupt))
100 }
101
102 unsafeSize() {
103 if (MutableRef.get(this.shutdownFlag)) {
104 return Option.none<number>()
105 }
106 return Option.some(
107 this.queue.length() -
108 MutableQueue.length(this.takers) +
109 this.strategy.surplusSize()
110 )
111 }
112
113 get isEmpty(): Effect.Effect<boolean> {
114 return core.map(this.size, (size) => size <= 0)
115 }
116
117 get isFull(): Effect.Effect<boolean> {
118 return core.map(this.size, (size) => size >= this.capacity())
119 }
120
121 get shutdown(): Effect.Effect<void> {
122 return core.uninterruptible(
123 core.withFiberRuntime((state) => {
124 pipe(this.shutdownFlag, MutableRef.set(true))

Callers

nothing calls this directly

Calls

no outgoing calls

Tested by

no test coverage detected