(iterable: Iterable<A>)
| 210 | } |
| 211 | |
| 212 | offerAll(iterable: Iterable<A>): Effect.Effect<boolean> { |
| 213 | return core.suspend(() => { |
| 214 | if (MutableRef.get(this.shutdownFlag)) { |
| 215 | return core.interrupt |
| 216 | } |
| 217 | const values = Arr.fromIterable(iterable) |
| 218 | const pTakers = this.queue.length() === 0 |
| 219 | ? Arr.fromIterable(unsafePollN(this.takers, values.length)) |
| 220 | : Arr.empty |
| 221 | const [forTakers, remaining] = pipe(values, Arr.splitAt(pTakers.length)) |
| 222 | for (let i = 0; i < pTakers.length; i++) { |
| 223 | const taker = (pTakers as any)[i] |
| 224 | const item = forTakers[i] |
| 225 | unsafeCompleteDeferred(taker, item) |
| 226 | } |
| 227 | if (remaining.length === 0) { |
| 228 | return core.succeed(true) |
| 229 | } |
| 230 | // Not enough takers, offer to the queue |
| 231 | const surplus = this.queue.offerAll(remaining) |
| 232 | unsafeCompleteTakers(this.strategy, this.queue, this.takers) |
| 233 | return Chunk.isEmpty(surplus) |
| 234 | ? core.succeed(true) |
| 235 | : this.strategy.handleSurplus(surplus, this.queue, this.takers, this.shutdownFlag) |
| 236 | }) |
| 237 | } |
| 238 | |
| 239 | get take(): Effect.Effect<A> { |
| 240 | return core.withFiberRuntime((state) => { |
nothing calls this directly
no test coverage detected