MCPcopy
hub / github.com/Effect-TS/effect / PubSubImpl

Class PubSubImpl

packages/effect/src/internal/pubsub.ts:1115–1256  ·  view source on GitHub ↗

@internal

Source from the content-addressed store, hash-verified

1113
1114/** @internal */
1115class PubSubImpl<in out A> implements PubSub.PubSub<A> {
1116 readonly [queue.EnqueueTypeId] = queue.enqueueVariance
1117 readonly [queue.DequeueTypeId] = queue.dequeueVariance
1118
1119 constructor(
1120 readonly pubsub: AtomicPubSub<A>,
1121 readonly subscribers: Subscribers<A>,
1122 readonly scope: Scope.Scope.Closeable,
1123 readonly shutdownHook: Deferred.Deferred<void>,
1124 readonly shutdownFlag: MutableRef.MutableRef<boolean>,
1125 readonly strategy: PubSubStrategy<A>
1126 ) {}
1127
1128 capacity(): number {
1129 return this.pubsub.capacity
1130 }
1131
1132 get size(): Effect.Effect<number> {
1133 return core.suspend(() =>
1134 MutableRef.get(this.shutdownFlag) ?
1135 core.interrupt :
1136 core.sync(() => this.pubsub.size())
1137 )
1138 }
1139
1140 unsafeSize(): Option.Option<number> {
1141 if (MutableRef.get(this.shutdownFlag)) {
1142 return Option.none()
1143 }
1144 return Option.some(this.pubsub.size())
1145 }
1146
1147 get isFull(): Effect.Effect<boolean> {
1148 return core.map(this.size, (size) => size === this.capacity())
1149 }
1150
1151 get isEmpty(): Effect.Effect<boolean> {
1152 return core.map(this.size, (size) => size === 0)
1153 }
1154
1155 get awaitShutdown(): Effect.Effect<void> {
1156 return core.deferredAwait(this.shutdownHook)
1157 }
1158
1159 get isShutdown(): Effect.Effect<boolean> {
1160 return core.sync(() => MutableRef.get(this.shutdownFlag))
1161 }
1162
1163 get shutdown(): Effect.Effect<void> {
1164 return core.uninterruptible(core.withFiberRuntime((state) => {
1165 pipe(this.shutdownFlag, MutableRef.set(true))
1166 return pipe(
1167 this.scope.close(core.exitInterrupt(state.id())),
1168 core.zipRight(this.strategy.shutdown),
1169 core.whenEffect(core.deferredSucceed(this.shutdownHook, void 0)),
1170 core.asVoid
1171 )
1172 }))

Callers

nothing calls this directly

Calls

no outgoing calls

Tested by

no test coverage detected