@internal
| 911 | |
| 912 | /** @internal */ |
| 913 | class SubscriptionImpl<in out A> extends Effectable.Class<A> implements Queue.Dequeue<A> { |
| 914 | [queue.DequeueTypeId] = queue.dequeueVariance |
| 915 | |
| 916 | constructor( |
| 917 | readonly pubsub: AtomicPubSub<A>, |
| 918 | readonly subscribers: Subscribers<A>, |
| 919 | readonly subscription: Subscription<A>, |
| 920 | readonly pollers: MutableQueue.MutableQueue<Deferred.Deferred<A>>, |
| 921 | readonly shutdownHook: Deferred.Deferred<void>, |
| 922 | readonly shutdownFlag: MutableRef.MutableRef<boolean>, |
| 923 | readonly strategy: PubSubStrategy<A>, |
| 924 | readonly replayWindow: ReplayWindow<A> |
| 925 | ) { |
| 926 | super() |
| 927 | } |
| 928 | |
| 929 | commit() { |
| 930 | return this.take |
| 931 | } |
| 932 | |
| 933 | pipe() { |
| 934 | return pipeArguments(this, arguments) |
| 935 | } |
| 936 | |
| 937 | capacity(): number { |
| 938 | return this.pubsub.capacity |
| 939 | } |
| 940 | |
| 941 | isActive(): boolean { |
| 942 | return !MutableRef.get(this.shutdownFlag) |
| 943 | } |
| 944 | |
| 945 | get size(): Effect.Effect<number> { |
| 946 | return core.suspend(() => |
| 947 | MutableRef.get(this.shutdownFlag) |
| 948 | ? core.interrupt |
| 949 | : core.succeed(this.subscription.size() + this.replayWindow.remaining) |
| 950 | ) |
| 951 | } |
| 952 | |
| 953 | unsafeSize(): Option.Option<number> { |
| 954 | if (MutableRef.get(this.shutdownFlag)) { |
| 955 | return Option.none() |
| 956 | } |
| 957 | return Option.some(this.subscription.size() + this.replayWindow.remaining) |
| 958 | } |
| 959 | |
| 960 | get isFull(): Effect.Effect<boolean> { |
| 961 | return core.suspend(() => |
| 962 | MutableRef.get(this.shutdownFlag) |
| 963 | ? core.interrupt |
| 964 | : core.succeed(this.subscription.size() === this.capacity()) |
| 965 | ) |
| 966 | } |
| 967 | |
| 968 | get isEmpty(): Effect.Effect<boolean> { |
| 969 | return core.map(this.size, (size) => size === 0) |
| 970 | } |
nothing calls this directly
no outgoing calls
no test coverage detected
searching dependent graphs…