(value: A)
| 178 | } |
| 179 | |
| 180 | offer(value: A): Effect.Effect<boolean> { |
| 181 | return core.suspend(() => { |
| 182 | if (MutableRef.get(this.shutdownFlag)) { |
| 183 | return core.interrupt |
| 184 | } |
| 185 | let noRemaining: boolean |
| 186 | if (this.queue.length() === 0) { |
| 187 | const taker = pipe( |
| 188 | this.takers, |
| 189 | MutableQueue.poll(MutableQueue.EmptyMutableQueue) |
| 190 | ) |
| 191 | if (taker !== MutableQueue.EmptyMutableQueue) { |
| 192 | unsafeCompleteDeferred(taker, value) |
| 193 | noRemaining = true |
| 194 | } else { |
| 195 | noRemaining = false |
| 196 | } |
| 197 | } else { |
| 198 | noRemaining = false |
| 199 | } |
| 200 | if (noRemaining) { |
| 201 | return core.succeed(true) |
| 202 | } |
| 203 | // Not enough takers, offer to the queue |
| 204 | const succeeded = this.queue.offer(value) |
| 205 | unsafeCompleteTakers(this.strategy, this.queue, this.takers) |
| 206 | return succeeded |
| 207 | ? core.succeed(true) |
| 208 | : this.strategy.handleSurplus([value], this.queue, this.takers, this.shutdownFlag) |
| 209 | }) |
| 210 | } |
| 211 | |
| 212 | offerAll(iterable: Iterable<A>): Effect.Effect<boolean> { |
| 213 | return core.suspend(() => { |
nothing calls this directly
no test coverage detected