(newTransport: ReplBridgeTransport)
| 1206 | // Extracted so the (sync) v1 and (async) v2 construction paths can |
| 1207 | // share the identical callback + flush machinery. |
| 1208 | const wireTransport = (newTransport: ReplBridgeTransport): void => { |
| 1209 | transport = newTransport |
| 1210 | |
| 1211 | newTransport.setOnConnect(() => { |
| 1212 | // Guard: if transport was replaced by a newer onWorkReceived call |
| 1213 | // while the WS was connecting, ignore this stale callback. |
| 1214 | if (transport !== newTransport) return |
| 1215 | |
| 1216 | logForDebugging('[bridge:repl] Ingress transport connected') |
| 1217 | logEvent('tengu_bridge_repl_ws_connected', {}) |
| 1218 | |
| 1219 | // Update the env var with the latest OAuth token so POST writes |
| 1220 | // (which read via getSessionIngressAuthToken()) use a fresh token. |
| 1221 | // v2 skips this — createV2ReplTransport already stored the JWT, |
| 1222 | // and overwriting it with OAuth would break subsequent /worker/* |
| 1223 | // requests (session_id claim check). |
| 1224 | if (!useCcrV2) { |
| 1225 | const freshToken = getOAuthToken() |
| 1226 | if (freshToken) { |
| 1227 | updateSessionIngressAuthToken(freshToken) |
| 1228 | } |
| 1229 | } |
| 1230 | |
| 1231 | // Reset teardownStarted so future teardowns are not blocked. |
| 1232 | teardownStarted = false |
| 1233 | |
| 1234 | // Flush initial messages only on first connect, not on every |
| 1235 | // WS reconnection. Re-flushing would cause duplicate messages. |
| 1236 | // IMPORTANT: onStateChange('connected') is deferred until the |
| 1237 | // flush completes. This prevents writeMessages() from sending |
| 1238 | // new messages that could arrive at the server interleaved with |
| 1239 | // the historical messages, and delays the web UI from showing |
| 1240 | // the session as active until history is persisted. |
| 1241 | if ( |
| 1242 | !initialFlushDone && |
| 1243 | initialMessages && |
| 1244 | initialMessages.length > 0 |
| 1245 | ) { |
| 1246 | initialFlushDone = true |
| 1247 | |
| 1248 | // Cap the initial flush to the most recent N messages. The full |
| 1249 | // history is UI-only (model doesn't see it) and large replays cause |
| 1250 | // slow session-ingress persistence (each event is a threadstore write) |
| 1251 | // plus elevated Firestore pressure. A 0 or negative cap disables it. |
| 1252 | const historyCap = initialHistoryCap |
| 1253 | const eligibleMessages = initialMessages.filter( |
| 1254 | m => |
| 1255 | isEligibleBridgeMessage(m) && |
| 1256 | !previouslyFlushedUUIDs?.has(m.uuid), |
| 1257 | ) |
| 1258 | const cappedMessages = |
| 1259 | historyCap > 0 && eligibleMessages.length > historyCap |
| 1260 | ? eligibleMessages.slice(-historyCap) |
| 1261 | : eligibleMessages |
| 1262 | if (cappedMessages.length < eligibleMessages.length) { |
| 1263 | logForDebugging( |
| 1264 | `[bridge:repl] Capped initial flush: ${eligibleMessages.length} -> ${cappedMessages.length} (cap=${historyCap})`, |
| 1265 | ) |
no test coverage detected