(
fresh: RemoteCredentials,
cause: Exclude<ConnectCause, 'initial'>,
)
| 475 | // the flag — moving it here would be too late to prevent a double /bridge |
| 476 | // fetch, and each fetch bumps epoch. |
| 477 | async function rebuildTransport( |
| 478 | fresh: RemoteCredentials, |
| 479 | cause: Exclude<ConnectCause, 'initial'>, |
| 480 | ): Promise<void> { |
| 481 | connectCause = cause |
| 482 | // Queue writes during rebuild — once /bridge returns, the old transport's |
| 483 | // epoch is stale and its next write/heartbeat 409s. Without this gate, |
| 484 | // writeMessages adds UUIDs to recentPostedUUIDs then writeBatch silently |
| 485 | // no-ops (closed uploader after 409) → permanent silent message loss. |
| 486 | flushGate.start() |
| 487 | try { |
| 488 | const seq = transport.getLastSequenceNum() |
| 489 | transport.close() |
| 490 | transport = await createV2ReplTransport({ |
| 491 | sessionUrl: buildCCRv2SdkUrl(fresh.api_base_url, sessionId), |
| 492 | ingressToken: fresh.worker_jwt, |
| 493 | sessionId, |
| 494 | epoch: fresh.worker_epoch, |
| 495 | heartbeatIntervalMs: cfg.heartbeat_interval_ms, |
| 496 | heartbeatJitterFraction: cfg.heartbeat_jitter_fraction, |
| 497 | initialSequenceNum: seq, |
| 498 | getAuthToken: () => fresh.worker_jwt, |
| 499 | outboundOnly, |
| 500 | }) |
| 501 | if (tornDown) { |
| 502 | // Teardown fired during the async createV2ReplTransport window. |
| 503 | // Don't wire/connect/schedule — we'd re-arm timers after cancelAll() |
| 504 | // and fire onInboundMessage into a torn-down bridge. |
| 505 | transport.close() |
| 506 | return |
| 507 | } |
| 508 | wireTransportCallbacks() |
| 509 | transport.connect() |
| 510 | connectDeadline = setTimeout( |
| 511 | onConnectTimeout, |
| 512 | cfg.connect_timeout_ms, |
| 513 | connectCause, |
| 514 | ) |
| 515 | refresh.scheduleFromExpiresIn(sessionId, fresh.expires_in) |
| 516 | // Drain queued writes into the new uploader. Runs before |
| 517 | // ccr.initialize() resolves (transport.connect() is fire-and-forget), |
| 518 | // but the uploader serializes behind the initial PUT /worker. If |
| 519 | // init fails (4091), events drop — but only recentPostedUUIDs |
| 520 | // (per-instance) is populated, so re-enabling the bridge re-flushes. |
| 521 | drainFlushGate() |
| 522 | } finally { |
| 523 | // End the gate on failure paths too — drainFlushGate already ended |
| 524 | // it on success. Queued messages are dropped (transport still dead). |
| 525 | flushGate.drop() |
| 526 | } |
| 527 | } |
| 528 | |
| 529 | // ── 8. 401 recovery (OAuth refresh + rebuild) ─────────────────────────── |
| 530 | async function recoverFromAuthFailure(): Promise<void> { |
no test coverage detected