(
fresh: RemoteCredentials,
cause: Exclude<ConnectCause, 'initial'>,
)
| 502 | // the flag — moving it here would be too late to prevent a double /bridge |
| 503 | // fetch, and each fetch bumps epoch. |
| 504 | async function rebuildTransport( |
| 505 | fresh: RemoteCredentials, |
| 506 | cause: Exclude<ConnectCause, 'initial'>, |
| 507 | ): Promise<void> { |
| 508 | connectCause = cause |
| 509 | // Queue writes during rebuild — once /bridge returns, the old transport's |
| 510 | // epoch is stale and its next write/heartbeat 409s. Without this gate, |
| 511 | // writeMessages adds UUIDs to recentPostedUUIDs then writeBatch silently |
| 512 | // no-ops (closed uploader after 409) → permanent silent message loss. |
| 513 | flushGate.start() |
| 514 | try { |
| 515 | const seq = transport.getLastSequenceNum() |
| 516 | transport.close() |
| 517 | transport = await createV2ReplTransport({ |
| 518 | sessionUrl: buildCCRv2SdkUrl(fresh.api_base_url, sessionId), |
| 519 | ingressToken: fresh.worker_jwt, |
| 520 | sessionId, |
| 521 | epoch: fresh.worker_epoch, |
| 522 | heartbeatIntervalMs: cfg.heartbeat_interval_ms, |
| 523 | heartbeatJitterFraction: cfg.heartbeat_jitter_fraction, |
| 524 | initialSequenceNum: seq, |
| 525 | getAuthToken: () => fresh.worker_jwt, |
| 526 | outboundOnly, |
| 527 | }) |
| 528 | if (tornDown) { |
| 529 | // Teardown fired during the async createV2ReplTransport window. |
| 530 | // Don't wire/connect/schedule — we'd re-arm timers after cancelAll() |
| 531 | // and fire onInboundMessage into a torn-down bridge. |
| 532 | transport.close() |
| 533 | return |
| 534 | } |
| 535 | wireTransportCallbacks() |
| 536 | transport.connect() |
| 537 | connectDeadline = setTimeout( |
| 538 | onConnectTimeout, |
| 539 | cfg.connect_timeout_ms, |
| 540 | connectCause, |
| 541 | ) |
| 542 | refresh.scheduleFromExpiresIn(sessionId, fresh.expires_in) |
| 543 | // Drain queued writes into the new uploader. Runs before |
| 544 | // ccr.initialize() resolves (transport.connect() is fire-and-forget), |
| 545 | // but the uploader serializes behind the initial PUT /worker. If |
| 546 | // init fails (4091), events drop — but only recentPostedUUIDs |
| 547 | // (per-instance) is populated, so re-enabling the bridge re-flushes. |
| 548 | drainFlushGate() |
| 549 | } finally { |
| 550 | // End the gate on failure paths too — drainFlushGate already ended |
| 551 | // it on success. Queued messages are dropped (transport still dead). |
| 552 | flushGate.drop() |
| 553 | } |
| 554 | } |
| 555 | |
| 556 | // ── 8. 401 recovery (OAuth refresh + rebuild) ─────────────────────────── |
| 557 | async function recoverFromAuthFailure(): Promise<void> { |
no test coverage detected