MCPcopy
hub / github.com/Effect-TS/effect / SubscriptionImpl

Class SubscriptionImpl

packages/effect/src/internal/pubsub.ts:913–1077  ·  view source on GitHub ↗

@internal

Source from the content-addressed store, hash-verified

911
912/** @internal */
913class SubscriptionImpl<in out A> extends Effectable.Class<A> implements Queue.Dequeue<A> {
914 [queue.DequeueTypeId] = queue.dequeueVariance
915
916 constructor(
917 readonly pubsub: AtomicPubSub<A>,
918 readonly subscribers: Subscribers<A>,
919 readonly subscription: Subscription<A>,
920 readonly pollers: MutableQueue.MutableQueue<Deferred.Deferred<A>>,
921 readonly shutdownHook: Deferred.Deferred<void>,
922 readonly shutdownFlag: MutableRef.MutableRef<boolean>,
923 readonly strategy: PubSubStrategy<A>,
924 readonly replayWindow: ReplayWindow<A>
925 ) {
926 super()
927 }
928
929 commit() {
930 return this.take
931 }
932
933 pipe() {
934 return pipeArguments(this, arguments)
935 }
936
937 capacity(): number {
938 return this.pubsub.capacity
939 }
940
941 isActive(): boolean {
942 return !MutableRef.get(this.shutdownFlag)
943 }
944
945 get size(): Effect.Effect<number> {
946 return core.suspend(() =>
947 MutableRef.get(this.shutdownFlag)
948 ? core.interrupt
949 : core.succeed(this.subscription.size() + this.replayWindow.remaining)
950 )
951 }
952
953 unsafeSize(): Option.Option<number> {
954 if (MutableRef.get(this.shutdownFlag)) {
955 return Option.none()
956 }
957 return Option.some(this.subscription.size() + this.replayWindow.remaining)
958 }
959
960 get isFull(): Effect.Effect<boolean> {
961 return core.suspend(() =>
962 MutableRef.get(this.shutdownFlag)
963 ? core.interrupt
964 : core.succeed(this.subscription.size() === this.capacity())
965 )
966 }
967
968 get isEmpty(): Effect.Effect<boolean> {
969 return core.map(this.size, (size) => size === 0)
970 }

Callers

nothing calls this directly

Calls

no outgoing calls

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…