MCPcopy Index your code
hub / github.com/Effect-TS/effect / UnboundedPubSubSubscription

Class UnboundedPubSubSubscription

packages/effect/src/internal/pubsub.ts:811–910  ·  view source on GitHub ↗

@internal

Source from the content-addressed store, hash-verified

809
810/** @internal */
811class UnboundedPubSubSubscription<in out A> implements Subscription<A> {
812 constructor(
813 private self: UnboundedPubSub<A>,
814 private subscriberHead: Node<A>,
815 private subscriberIndex: number,
816 private unsubscribed: boolean
817 ) {
818 }
819
820 isEmpty(): boolean {
821 if (this.unsubscribed) {
822 return true
823 }
824 let empty = true
825 let loop = true
826 while (loop) {
827 if (this.subscriberHead === this.self.publisherTail) {
828 loop = false
829 } else {
830 if (this.subscriberHead.next!.value !== AbsentValue) {
831 empty = false
832 loop = false
833 } else {
834 this.subscriberHead = this.subscriberHead.next!
835 this.subscriberIndex += 1
836 }
837 }
838 }
839 return empty
840 }
841
842 size() {
843 if (this.unsubscribed) {
844 return 0
845 }
846 return this.self.publisherIndex - Math.max(this.subscriberIndex, this.self.subscribersIndex)
847 }
848
849 poll<D>(default_: D): A | D {
850 if (this.unsubscribed) {
851 return default_
852 }
853 let loop = true
854 let polled: A | D = default_
855 while (loop) {
856 if (this.subscriberHead === this.self.publisherTail) {
857 loop = false
858 } else {
859 const elem = this.subscriberHead.next!.value
860 if (elem !== AbsentValue) {
861 polled = elem
862 this.subscriberHead.subscribers -= 1
863 if (this.subscriberHead.subscribers === 0) {
864 this.self.publisherHead = this.self.publisherHead.next!
865 this.self.publisherHead.value = AbsentValue
866 this.self.subscribersIndex += 1
867 }
868 loop = false

Callers

nothing calls this directly

Calls

no outgoing calls

Tested by

no test coverage detected