MCPcopy Index your code
hub / github.com/coder/mux / runActivitySubscription

Method runActivitySubscription

src/browser/stores/WorkspaceStore.ts:3207–3309  ·  view source on GitHub ↗
(signal: AbortSignal)

Source from the content-addressed store, hash-verified

3205 }
3206
3207 private async runActivitySubscription(signal: AbortSignal): Promise<void> {
3208 let attempt = 0;
3209
3210 while (!signal.aborted) {
3211 const client = this.client ?? (await this.waitForClient(signal));
3212 if (!client || signal.aborted) {
3213 return;
3214 }
3215
3216 const attemptController = new AbortController();
3217 const releaseAttemptListeners = this.linkAttemptToClientChange(attemptController, signal);
3218 const watchdog = this.createStallWatchdog(attemptController, "activity subscription");
3219
3220 try {
3221 // Open the live delta stream first so no state transition can be lost
3222 // between the list snapshot fetch and subscribe registration.
3223 const iterator = await client.workspace.activity.subscribe(undefined, {
3224 signal: attemptController.signal,
3225 });
3226
3227 const snapshots = await client.workspace.activity.list();
3228 if (signal.aborted) {
3229 return;
3230 }
3231 // Client changed while list() was in flight — retry with the new client
3232 // instead of exiting permanently. The outer while loop will pick up the
3233 // replacement client on the next iteration.
3234 if (attemptController.signal.aborted) {
3235 continue;
3236 }
3237
3238 queueMicrotask(() => {
3239 if (signal.aborted || attemptController.signal.aborted) {
3240 return;
3241 }
3242 this.applyWorkspaceActivityList(snapshots);
3243 this.markActivityHydrated();
3244 });
3245
3246 // Start watchdog after bootstrap so slow list() doesn't trigger
3247 // false-positive reconnects.
3248 watchdog.start();
3249
3250 for await (const event of iterator) {
3251 if (signal.aborted) {
3252 return;
3253 }
3254
3255 watchdog.markEvent();
3256
3257 // Connection is alive again - don't carry old backoff into the next failure.
3258 attempt = 0;
3259
3260 if (event.type === "heartbeat") {
3261 continue;
3262 }
3263
3264 queueMicrotask(() => {

Callers 1

Calls 13

waitForClientMethod · 0.95
createStallWatchdogMethod · 0.95
markActivityHydratedMethod · 0.95
sleepWithAbortMethod · 0.95
isAbortErrorFunction · 0.90
subscribeMethod · 0.65
listMethod · 0.45
startMethod · 0.45

Tested by

no test coverage detected