MCPcopy
hub / github.com/Effect-TS/effect / take

Method take

packages/effect/src/internal/pubsub.ts:1002–1035  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

1000 }
1001
1002 get take(): Effect.Effect<A> {
1003 return core.withFiberRuntime((state) => {
1004 if (MutableRef.get(this.shutdownFlag)) {
1005 return core.interrupt
1006 }
1007 if (this.replayWindow.remaining > 0) {
1008 const message = this.replayWindow.take()!
1009 return core.succeed(message)
1010 }
1011 const message = MutableQueue.isEmpty(this.pollers)
1012 ? this.subscription.poll(MutableQueue.EmptyMutableQueue)
1013 : MutableQueue.EmptyMutableQueue
1014 if (message === MutableQueue.EmptyMutableQueue) {
1015 const deferred = core.deferredUnsafeMake<A>(state.id())
1016 return pipe(
1017 core.suspend(() => {
1018 pipe(this.pollers, MutableQueue.offer(deferred))
1019 pipe(this.subscribers, addSubscribers(this.subscription, this.pollers))
1020 this.strategy.unsafeCompletePollers(
1021 this.pubsub,
1022 this.subscribers,
1023 this.subscription,
1024 this.pollers
1025 )
1026 return MutableRef.get(this.shutdownFlag) ? core.interrupt : core.deferredAwait(deferred)
1027 }),
1028 core.onInterrupt(() => core.sync(() => unsafeRemove(this.pollers, deferred)))
1029 )
1030 } else {
1031 this.strategy.unsafeOnPubSubEmptySpace(this.pubsub, this.subscribers)
1032 return core.succeed(message)
1033 }
1034 })
1035 }
1036
1037 get takeAll(): Effect.Effect<Chunk.Chunk<A>> {
1038 return core.suspend(() => {

Callers

nothing calls this directly

Calls 12

addSubscribersFunction · 0.85
syncMethod · 0.80
pipeFunction · 0.70
unsafeRemoveFunction · 0.70
getMethod · 0.65
takeMethod · 0.65
isEmptyMethod · 0.65
pollMethod · 0.65
idMethod · 0.65
offerMethod · 0.65
unsafeCompletePollersMethod · 0.65

Tested by

no test coverage detected