()
| 237 | } |
| 238 | |
| 239 | get take(): Effect.Effect<A> { |
| 240 | return core.withFiberRuntime((state) => { |
| 241 | if (MutableRef.get(this.shutdownFlag)) { |
| 242 | return core.interrupt |
| 243 | } |
| 244 | const item = this.queue.poll(MutableQueue.EmptyMutableQueue) |
| 245 | if (item !== MutableQueue.EmptyMutableQueue) { |
| 246 | this.strategy.unsafeOnQueueEmptySpace(this.queue, this.takers) |
| 247 | return core.succeed(item) |
| 248 | } else { |
| 249 | // Add the deferred to takers, then: |
| 250 | // - Try to take again in case a value was added since |
| 251 | // - Wait for the deferred to be completed |
| 252 | // - Clean up resources in case of interruption |
| 253 | const deferred = core.deferredUnsafeMake<A>(state.id()) |
| 254 | return pipe( |
| 255 | core.suspend(() => { |
| 256 | pipe(this.takers, MutableQueue.offer(deferred)) |
| 257 | unsafeCompleteTakers(this.strategy, this.queue, this.takers) |
| 258 | return MutableRef.get(this.shutdownFlag) ? |
| 259 | core.interrupt : |
| 260 | core.deferredAwait(deferred) |
| 261 | }), |
| 262 | core.onInterrupt(() => { |
| 263 | return core.sync(() => unsafeRemove(this.takers, deferred)) |
| 264 | }) |
| 265 | ) |
| 266 | } |
| 267 | }) |
| 268 | } |
| 269 | |
| 270 | get takeAll(): Effect.Effect<Chunk.Chunk<A>> { |
| 271 | return core.suspend(() => { |
nothing calls this directly
no test coverage detected