MCPcopy
hub / github.com/Effect-TS/effect / UnboundedPubSub

Class UnboundedPubSub

packages/effect/src/internal/pubsub.ts:731–808  ·  view source on GitHub ↗

@internal

Source from the content-addressed store, hash-verified

729
730/** @internal */
731class UnboundedPubSub<in out A> implements AtomicPubSub<A> {
732 publisherHead: Node<A> = {
733 value: AbsentValue,
734 subscribers: 0,
735 next: null
736 }
737 publisherTail = this.publisherHead
738 publisherIndex = 0
739 subscribersIndex = 0
740
741 readonly capacity = Number.MAX_SAFE_INTEGER
742 constructor(readonly replayBuffer: ReplayBuffer<A> | undefined) {}
743
744 replayWindow(): ReplayWindow<A> {
745 return this.replayBuffer ? new ReplayWindowImpl(this.replayBuffer) : emptyReplayWindow
746 }
747
748 isEmpty(): boolean {
749 return this.publisherHead === this.publisherTail
750 }
751
752 isFull(): boolean {
753 return false
754 }
755
756 size(): number {
757 return this.publisherIndex - this.subscribersIndex
758 }
759
760 publish(value: A): boolean {
761 const subscribers = this.publisherTail.subscribers
762 if (subscribers !== 0) {
763 this.publisherTail.next = {
764 value,
765 subscribers,
766 next: null
767 }
768 this.publisherTail = this.publisherTail.next
769 this.publisherIndex += 1
770 }
771 if (this.replayBuffer) {
772 this.replayBuffer.offer(value)
773 }
774 return true
775 }
776
777 publishAll(elements: Iterable<A>): Chunk.Chunk<A> {
778 if (this.publisherTail.subscribers !== 0) {
779 for (const a of elements) {
780 this.publish(a)
781 }
782 } else if (this.replayBuffer) {
783 this.replayBuffer.offerAll(elements)
784 }
785 return Chunk.empty()
786 }
787
788 slide(): void {

Callers

nothing calls this directly

Calls

no outgoing calls

Tested by

no test coverage detected