MCPcopy Index your code
hub / github.com/Effect-TS/effect / BoundedPubSubSingleSubscription

Class BoundedPubSubSingleSubscription

packages/effect/src/internal/pubsub.ts:663–721  ·  view source on GitHub ↗

@internal

Source from the content-addressed store, hash-verified

661
662/** @internal */
663class BoundedPubSubSingleSubscription<in out A> implements Subscription<A> {
664 constructor(
665 private self: BoundedPubSubSingle<A>,
666 private subscriberIndex: number,
667 private unsubscribed: boolean
668 ) {
669 }
670
671 isEmpty(): boolean {
672 return (
673 this.unsubscribed ||
674 this.self.subscribers === 0 ||
675 this.subscriberIndex === this.self.publisherIndex
676 )
677 }
678
679 size() {
680 return this.isEmpty() ? 0 : 1
681 }
682
683 poll<D>(default_: D): A | D {
684 if (this.isEmpty()) {
685 return default_
686 }
687 const elem = this.self.value
688 this.self.subscribers -= 1
689 if (this.self.subscribers === 0) {
690 this.self.value = AbsentValue as unknown as A
691 }
692 this.subscriberIndex += 1
693 return elem
694 }
695
696 pollUpTo(n: number): Chunk.Chunk<A> {
697 if (this.isEmpty() || n < 1) {
698 return Chunk.empty()
699 }
700 const a = this.self.value
701 this.self.subscribers -= 1
702 if (this.self.subscribers === 0) {
703 this.self.value = AbsentValue as unknown as A
704 }
705 this.subscriberIndex += 1
706 return Chunk.of(a)
707 }
708
709 unsubscribe(): void {
710 if (!this.unsubscribed) {
711 this.unsubscribed = true
712 this.self.subscriberCount -= 1
713 if (this.subscriberIndex !== this.self.publisherIndex) {
714 this.self.subscribers -= 1
715 if (this.self.subscribers === 0) {
716 this.self.value = AbsentValue as unknown as A
717 }
718 }
719 }
720 }

Callers

nothing calls this directly

Calls

no outgoing calls

Tested by

no test coverage detected