MCPcopy Index your code
hub / github.com/Effect-TS/effect / BoundedPubSubArbSubscription

Class BoundedPubSubArbSubscription

packages/effect/src/internal/pubsub.ts:314–399  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

312}
313
314class BoundedPubSubArbSubscription<in out A> implements Subscription<A> {
315 constructor(
316 private self: BoundedPubSubArb<A>,
317 private subscriberIndex: number,
318 private unsubscribed: boolean
319 ) {
320 }
321
322 isEmpty(): boolean {
323 return (
324 this.unsubscribed ||
325 this.self.publisherIndex === this.subscriberIndex ||
326 this.self.publisherIndex === this.self.subscribersIndex
327 )
328 }
329
330 size() {
331 if (this.unsubscribed) {
332 return 0
333 }
334 return this.self.publisherIndex - Math.max(this.subscriberIndex, this.self.subscribersIndex)
335 }
336
337 poll<D>(default_: D): A | D {
338 if (this.unsubscribed) {
339 return default_
340 }
341 this.subscriberIndex = Math.max(this.subscriberIndex, this.self.subscribersIndex)
342 if (this.subscriberIndex !== this.self.publisherIndex) {
343 const index = this.subscriberIndex % this.self.capacity
344 const elem = this.self.array[index]!
345 this.self.subscribers[index] -= 1
346 if (this.self.subscribers[index] === 0) {
347 this.self.array[index] = AbsentValue as unknown as A
348 this.self.subscribersIndex += 1
349 }
350 this.subscriberIndex += 1
351 return elem
352 }
353 return default_
354 }
355
356 pollUpTo(n: number): Chunk.Chunk<A> {
357 if (this.unsubscribed) {
358 return Chunk.empty()
359 }
360 this.subscriberIndex = Math.max(this.subscriberIndex, this.self.subscribersIndex)
361 const size = this.self.publisherIndex - this.subscriberIndex
362 const toPoll = Math.min(n, size)
363 if (toPoll <= 0) {
364 return Chunk.empty()
365 }
366 const builder: Array<A> = []
367 const pollUpToIndex = this.subscriberIndex + toPoll
368 while (this.subscriberIndex !== pollUpToIndex) {
369 const index = this.subscriberIndex % this.self.capacity
370 const a = this.self.array[index] as A
371 this.self.subscribers[index] -= 1

Callers

nothing calls this directly

Calls

no outgoing calls

Tested by

no test coverage detected