MCPcopy
hub / github.com/Effect-TS/effect / BoundedPubSubPow2

Class BoundedPubSubPow2

packages/effect/src/internal/pubsub.ts:402–494  ·  view source on GitHub ↗

@internal

Source from the content-addressed store, hash-verified

400
401/** @internal */
402class BoundedPubSubPow2<in out A> implements AtomicPubSub<A> {
403 array: Array<A>
404 mask: number
405 publisherIndex = 0
406 subscribers: Array<number>
407 subscriberCount = 0
408 subscribersIndex = 0
409
410 constructor(readonly capacity: number, readonly replayBuffer: ReplayBuffer<A> | undefined) {
411 this.array = Array.from({ length: capacity })
412 this.mask = capacity - 1
413 this.subscribers = Array.from({ length: capacity })
414 }
415
416 replayWindow(): ReplayWindow<A> {
417 return this.replayBuffer ? new ReplayWindowImpl(this.replayBuffer) : emptyReplayWindow
418 }
419
420 isEmpty(): boolean {
421 return this.publisherIndex === this.subscribersIndex
422 }
423
424 isFull(): boolean {
425 return this.publisherIndex === this.subscribersIndex + this.capacity
426 }
427
428 size(): number {
429 return this.publisherIndex - this.subscribersIndex
430 }
431
432 publish(value: A): boolean {
433 if (this.isFull()) {
434 return false
435 }
436 if (this.subscriberCount !== 0) {
437 const index = this.publisherIndex & this.mask
438 this.array[index] = value
439 this.subscribers[index] = this.subscriberCount
440 this.publisherIndex += 1
441 }
442 if (this.replayBuffer) {
443 this.replayBuffer.offer(value)
444 }
445 return true
446 }
447
448 publishAll(elements: Iterable<A>): Chunk.Chunk<A> {
449 if (this.subscriberCount === 0) {
450 if (this.replayBuffer) {
451 this.replayBuffer.offerAll(elements)
452 }
453 return Chunk.empty()
454 }
455 const chunk = Chunk.fromIterable(elements)
456 const n = chunk.length
457 const size = this.publisherIndex - this.subscribersIndex
458 const available = this.capacity - size
459 const forPubSub = Math.min(n, available)

Callers

nothing calls this directly

Calls

no outgoing calls

Tested by

no test coverage detected