()
| 1000 | } |
| 1001 | |
| 1002 | get take(): Effect.Effect<A> { |
| 1003 | return core.withFiberRuntime((state) => { |
| 1004 | if (MutableRef.get(this.shutdownFlag)) { |
| 1005 | return core.interrupt |
| 1006 | } |
| 1007 | if (this.replayWindow.remaining > 0) { |
| 1008 | const message = this.replayWindow.take()! |
| 1009 | return core.succeed(message) |
| 1010 | } |
| 1011 | const message = MutableQueue.isEmpty(this.pollers) |
| 1012 | ? this.subscription.poll(MutableQueue.EmptyMutableQueue) |
| 1013 | : MutableQueue.EmptyMutableQueue |
| 1014 | if (message === MutableQueue.EmptyMutableQueue) { |
| 1015 | const deferred = core.deferredUnsafeMake<A>(state.id()) |
| 1016 | return pipe( |
| 1017 | core.suspend(() => { |
| 1018 | pipe(this.pollers, MutableQueue.offer(deferred)) |
| 1019 | pipe(this.subscribers, addSubscribers(this.subscription, this.pollers)) |
| 1020 | this.strategy.unsafeCompletePollers( |
| 1021 | this.pubsub, |
| 1022 | this.subscribers, |
| 1023 | this.subscription, |
| 1024 | this.pollers |
| 1025 | ) |
| 1026 | return MutableRef.get(this.shutdownFlag) ? core.interrupt : core.deferredAwait(deferred) |
| 1027 | }), |
| 1028 | core.onInterrupt(() => core.sync(() => unsafeRemove(this.pollers, deferred))) |
| 1029 | ) |
| 1030 | } else { |
| 1031 | this.strategy.unsafeOnPubSubEmptySpace(this.pubsub, this.subscribers) |
| 1032 | return core.succeed(message) |
| 1033 | } |
| 1034 | }) |
| 1035 | } |
| 1036 | |
| 1037 | get takeAll(): Effect.Effect<Chunk.Chunk<A>> { |
| 1038 | return core.suspend(() => { |
nothing calls this directly
no test coverage detected