MCPcopy
hub / github.com/Effect-TS/effect / BoundedPubSubArb

Class BoundedPubSubArb

packages/effect/src/internal/pubsub.ts:222–312  ·  view source on GitHub ↗

@internal

Source from the content-addressed store, hash-verified

220
221/** @internal */
222class BoundedPubSubArb<in out A> implements AtomicPubSub<A> {
223 array: Array<A>
224 publisherIndex = 0
225 subscribers: Array<number>
226 subscriberCount = 0
227 subscribersIndex = 0
228
229 constructor(readonly capacity: number, readonly replayBuffer: ReplayBuffer<A> | undefined) {
230 this.array = Array.from({ length: capacity })
231 this.subscribers = Array.from({ length: capacity })
232 }
233
234 replayWindow(): ReplayWindow<A> {
235 return this.replayBuffer ? new ReplayWindowImpl(this.replayBuffer) : emptyReplayWindow
236 }
237
238 isEmpty(): boolean {
239 return this.publisherIndex === this.subscribersIndex
240 }
241
242 isFull(): boolean {
243 return this.publisherIndex === this.subscribersIndex + this.capacity
244 }
245
246 size(): number {
247 return this.publisherIndex - this.subscribersIndex
248 }
249
250 publish(value: A): boolean {
251 if (this.isFull()) {
252 return false
253 }
254 if (this.subscriberCount !== 0) {
255 const index = this.publisherIndex % this.capacity
256 this.array[index] = value
257 this.subscribers[index] = this.subscriberCount
258 this.publisherIndex += 1
259 }
260 if (this.replayBuffer) {
261 this.replayBuffer.offer(value)
262 }
263 return true
264 }
265
266 publishAll(elements: Iterable<A>): Chunk.Chunk<A> {
267 if (this.subscriberCount === 0) {
268 if (this.replayBuffer) {
269 this.replayBuffer.offerAll(elements)
270 }
271 return Chunk.empty()
272 }
273 const chunk = Chunk.fromIterable(elements)
274 const n = chunk.length
275 const size = this.publisherIndex - this.subscribersIndex
276 const available = this.capacity - size
277 const forPubSub = Math.min(n, available)
278 if (forPubSub === 0) {
279 return chunk

Callers

nothing calls this directly

Calls

no outgoing calls

Tested by

no test coverage detected