(signal: AbortSignal)
| 3205 | } |
| 3206 | |
| 3207 | private async runActivitySubscription(signal: AbortSignal): Promise<void> { |
| 3208 | let attempt = 0; |
| 3209 | |
| 3210 | while (!signal.aborted) { |
| 3211 | const client = this.client ?? (await this.waitForClient(signal)); |
| 3212 | if (!client || signal.aborted) { |
| 3213 | return; |
| 3214 | } |
| 3215 | |
| 3216 | const attemptController = new AbortController(); |
| 3217 | const releaseAttemptListeners = this.linkAttemptToClientChange(attemptController, signal); |
| 3218 | const watchdog = this.createStallWatchdog(attemptController, "activity subscription"); |
| 3219 | |
| 3220 | try { |
| 3221 | // Open the live delta stream first so no state transition can be lost |
| 3222 | // between the list snapshot fetch and subscribe registration. |
| 3223 | const iterator = await client.workspace.activity.subscribe(undefined, { |
| 3224 | signal: attemptController.signal, |
| 3225 | }); |
| 3226 | |
| 3227 | const snapshots = await client.workspace.activity.list(); |
| 3228 | if (signal.aborted) { |
| 3229 | return; |
| 3230 | } |
| 3231 | // Client changed while list() was in flight — retry with the new client |
| 3232 | // instead of exiting permanently. The outer while loop will pick up the |
| 3233 | // replacement client on the next iteration. |
| 3234 | if (attemptController.signal.aborted) { |
| 3235 | continue; |
| 3236 | } |
| 3237 | |
| 3238 | queueMicrotask(() => { |
| 3239 | if (signal.aborted || attemptController.signal.aborted) { |
| 3240 | return; |
| 3241 | } |
| 3242 | this.applyWorkspaceActivityList(snapshots); |
| 3243 | this.markActivityHydrated(); |
| 3244 | }); |
| 3245 | |
| 3246 | // Start watchdog after bootstrap so slow list() doesn't trigger |
| 3247 | // false-positive reconnects. |
| 3248 | watchdog.start(); |
| 3249 | |
| 3250 | for await (const event of iterator) { |
| 3251 | if (signal.aborted) { |
| 3252 | return; |
| 3253 | } |
| 3254 | |
| 3255 | watchdog.markEvent(); |
| 3256 | |
| 3257 | // Connection is alive again - don't carry old backoff into the next failure. |
| 3258 | attempt = 0; |
| 3259 | |
| 3260 | if (event.type === "heartbeat") { |
| 3261 | continue; |
| 3262 | } |
| 3263 | |
| 3264 | queueMicrotask(() => { |
no test coverage detected