@internal
| 65 | |
| 66 | /** @internal */ |
| 67 | class QueueImpl<in out A> extends Effectable.Class<A> implements Queue.Queue<A> { |
| 68 | readonly [EnqueueTypeId] = enqueueVariance |
| 69 | readonly [DequeueTypeId] = dequeueVariance |
| 70 | |
| 71 | constructor( |
| 72 | /** @internal */ |
| 73 | readonly queue: Queue.BackingQueue<A>, |
| 74 | /** @internal */ |
| 75 | readonly takers: MutableQueue.MutableQueue<Deferred.Deferred<A>>, |
| 76 | /** @internal */ |
| 77 | readonly shutdownHook: Deferred.Deferred<void>, |
| 78 | /** @internal */ |
| 79 | readonly shutdownFlag: MutableRef.MutableRef<boolean>, |
| 80 | /** @internal */ |
| 81 | readonly strategy: Queue.Strategy<A> |
| 82 | ) { |
| 83 | super() |
| 84 | } |
| 85 | |
| 86 | pipe() { |
| 87 | return pipeArguments(this, arguments) |
| 88 | } |
| 89 | |
| 90 | commit() { |
| 91 | return this.take |
| 92 | } |
| 93 | |
| 94 | capacity(): number { |
| 95 | return this.queue.capacity() |
| 96 | } |
| 97 | |
| 98 | get size(): Effect.Effect<number> { |
| 99 | return core.suspend(() => core.catchAll(this.unsafeSize(), () => core.interrupt)) |
| 100 | } |
| 101 | |
| 102 | unsafeSize() { |
| 103 | if (MutableRef.get(this.shutdownFlag)) { |
| 104 | return Option.none<number>() |
| 105 | } |
| 106 | return Option.some( |
| 107 | this.queue.length() - |
| 108 | MutableQueue.length(this.takers) + |
| 109 | this.strategy.surplusSize() |
| 110 | ) |
| 111 | } |
| 112 | |
| 113 | get isEmpty(): Effect.Effect<boolean> { |
| 114 | return core.map(this.size, (size) => size <= 0) |
| 115 | } |
| 116 | |
| 117 | get isFull(): Effect.Effect<boolean> { |
| 118 | return core.map(this.size, (size) => size >= this.capacity()) |
| 119 | } |
| 120 | |
| 121 | get shutdown(): Effect.Effect<void> { |
| 122 | return core.uninterruptible( |
| 123 | core.withFiberRuntime((state) => { |
| 124 | pipe(this.shutdownFlag, MutableRef.set(true)) |
nothing calls this directly
no outgoing calls
no test coverage detected
searching dependent graphs…