@internal
| 1113 | |
| 1114 | /** @internal */ |
| 1115 | class PubSubImpl<in out A> implements PubSub.PubSub<A> { |
| 1116 | readonly [queue.EnqueueTypeId] = queue.enqueueVariance |
| 1117 | readonly [queue.DequeueTypeId] = queue.dequeueVariance |
| 1118 | |
| 1119 | constructor( |
| 1120 | readonly pubsub: AtomicPubSub<A>, |
| 1121 | readonly subscribers: Subscribers<A>, |
| 1122 | readonly scope: Scope.Scope.Closeable, |
| 1123 | readonly shutdownHook: Deferred.Deferred<void>, |
| 1124 | readonly shutdownFlag: MutableRef.MutableRef<boolean>, |
| 1125 | readonly strategy: PubSubStrategy<A> |
| 1126 | ) {} |
| 1127 | |
| 1128 | capacity(): number { |
| 1129 | return this.pubsub.capacity |
| 1130 | } |
| 1131 | |
| 1132 | get size(): Effect.Effect<number> { |
| 1133 | return core.suspend(() => |
| 1134 | MutableRef.get(this.shutdownFlag) ? |
| 1135 | core.interrupt : |
| 1136 | core.sync(() => this.pubsub.size()) |
| 1137 | ) |
| 1138 | } |
| 1139 | |
| 1140 | unsafeSize(): Option.Option<number> { |
| 1141 | if (MutableRef.get(this.shutdownFlag)) { |
| 1142 | return Option.none() |
| 1143 | } |
| 1144 | return Option.some(this.pubsub.size()) |
| 1145 | } |
| 1146 | |
| 1147 | get isFull(): Effect.Effect<boolean> { |
| 1148 | return core.map(this.size, (size) => size === this.capacity()) |
| 1149 | } |
| 1150 | |
| 1151 | get isEmpty(): Effect.Effect<boolean> { |
| 1152 | return core.map(this.size, (size) => size === 0) |
| 1153 | } |
| 1154 | |
| 1155 | get awaitShutdown(): Effect.Effect<void> { |
| 1156 | return core.deferredAwait(this.shutdownHook) |
| 1157 | } |
| 1158 | |
| 1159 | get isShutdown(): Effect.Effect<boolean> { |
| 1160 | return core.sync(() => MutableRef.get(this.shutdownFlag)) |
| 1161 | } |
| 1162 | |
| 1163 | get shutdown(): Effect.Effect<void> { |
| 1164 | return core.uninterruptible(core.withFiberRuntime((state) => { |
| 1165 | pipe(this.shutdownFlag, MutableRef.set(true)) |
| 1166 | return pipe( |
| 1167 | this.scope.close(core.exitInterrupt(state.id())), |
| 1168 | core.zipRight(this.strategy.shutdown), |
| 1169 | core.whenEffect(core.deferredSucceed(this.shutdownHook, void 0)), |
| 1170 | core.asVoid |
| 1171 | ) |
| 1172 | })) |
nothing calls this directly
no outgoing calls
no test coverage detected