(self, event: str, data: dict, sid: str)
| 11 | """Developer-only WebSocket test harness handler.""" |
| 12 | |
| 13 | async def process(self, event: str, data: dict, sid: str) -> dict[str, Any] | WsResult | None: |
| 14 | if event == "ws_event_console_subscribe": |
| 15 | if not runtime.is_development(): |
| 16 | return WsResult.error( |
| 17 | code="NOT_AVAILABLE", |
| 18 | message="Event console is available only in development mode", |
| 19 | ) |
| 20 | registered = self.manager.register_diagnostic_watcher(self.namespace, sid) |
| 21 | if not registered: |
| 22 | return WsResult.error( |
| 23 | code="SUBSCRIBE_FAILED", |
| 24 | message="Unable to subscribe to diagnostics", |
| 25 | ) |
| 26 | return {"status": "subscribed", "timestamp": data.get("requestedAt")} |
| 27 | |
| 28 | if event == "ws_event_console_unsubscribe": |
| 29 | self.manager.unregister_diagnostic_watcher(self.namespace, sid) |
| 30 | return {"status": "unsubscribed"} |
| 31 | |
| 32 | if event == "ws_tester_emit": |
| 33 | message = data.get("message", "emit") |
| 34 | payload = {"message": message, "echo": True, "timestamp": data.get("timestamp")} |
| 35 | await self.broadcast("ws_tester_broadcast", payload) |
| 36 | PrintStyle.info(f"Harness emit broadcasted message='{message}'") |
| 37 | return None |
| 38 | |
| 39 | if event == "ws_tester_request": |
| 40 | value = data.get("value") |
| 41 | PrintStyle.debug("Harness request responded with echo %s", value) |
| 42 | return {"echo": value, "handler": self.identifier, "status": "ok"} |
| 43 | |
| 44 | if event == "ws_tester_request_delayed": |
| 45 | delay_ms = int(data.get("delay_ms", 0)) |
| 46 | await asyncio.sleep(delay_ms / 1000) |
| 47 | PrintStyle.warning("Harness delayed request finished after %s ms", delay_ms) |
| 48 | return {"status": "delayed", "delay_ms": delay_ms, "handler": self.identifier} |
| 49 | |
| 50 | if event == "ws_tester_trigger_persistence": |
| 51 | phase = data.get("phase", "unknown") |
| 52 | payload = {"phase": phase, "handler": self.identifier} |
| 53 | await self.emit_to(sid, "ws_tester_persistence", payload) |
| 54 | PrintStyle.info(f"Harness persistence event phase='{phase}' -> {sid}") |
| 55 | return None |
| 56 | |
| 57 | if event == "ws_tester_broadcast_demo_trigger": |
| 58 | payload = {"demo": True, "requested_at": data.get("requested_at")} |
| 59 | await self.broadcast("ws_tester_broadcast_demo", payload) |
| 60 | PrintStyle.info("Harness broadcast demo event dispatched") |
| 61 | return None |
| 62 | |
| 63 | if event == "ws_tester_request_all": |
| 64 | correlation_id = data.get("correlationId") |
| 65 | aggregated = await self.dispatch_to_all_sids( |
| 66 | "ws_tester_request", |
| 67 | {"value": data.get("marker", "aggregate")}, |
| 68 | correlation_id=correlation_id, |
| 69 | ) |
| 70 | return {"results": aggregated} |
nothing calls this directly
no test coverage detected