MCPcopy Index your code
hub / github.com/agent0ai/agent-zero / process

Method process

api/ws_dev_test.py:13–77  ·  view source on GitHub ↗
(self, event: str, data: dict, sid: str)

Source from the content-addressed store, hash-verified

11 """Developer-only WebSocket test harness handler."""
12
13 async def process(self, event: str, data: dict, sid: str) -> dict[str, Any] | WsResult | None:
14 if event == "ws_event_console_subscribe":
15 if not runtime.is_development():
16 return WsResult.error(
17 code="NOT_AVAILABLE",
18 message="Event console is available only in development mode",
19 )
20 registered = self.manager.register_diagnostic_watcher(self.namespace, sid)
21 if not registered:
22 return WsResult.error(
23 code="SUBSCRIBE_FAILED",
24 message="Unable to subscribe to diagnostics",
25 )
26 return {"status": "subscribed", "timestamp": data.get("requestedAt")}
27
28 if event == "ws_event_console_unsubscribe":
29 self.manager.unregister_diagnostic_watcher(self.namespace, sid)
30 return {"status": "unsubscribed"}
31
32 if event == "ws_tester_emit":
33 message = data.get("message", "emit")
34 payload = {"message": message, "echo": True, "timestamp": data.get("timestamp")}
35 await self.broadcast("ws_tester_broadcast", payload)
36 PrintStyle.info(f"Harness emit broadcasted message='{message}'")
37 return None
38
39 if event == "ws_tester_request":
40 value = data.get("value")
41 PrintStyle.debug("Harness request responded with echo %s", value)
42 return {"echo": value, "handler": self.identifier, "status": "ok"}
43
44 if event == "ws_tester_request_delayed":
45 delay_ms = int(data.get("delay_ms", 0))
46 await asyncio.sleep(delay_ms / 1000)
47 PrintStyle.warning("Harness delayed request finished after %s ms", delay_ms)
48 return {"status": "delayed", "delay_ms": delay_ms, "handler": self.identifier}
49
50 if event == "ws_tester_trigger_persistence":
51 phase = data.get("phase", "unknown")
52 payload = {"phase": phase, "handler": self.identifier}
53 await self.emit_to(sid, "ws_tester_persistence", payload)
54 PrintStyle.info(f"Harness persistence event phase='{phase}' -> {sid}")
55 return None
56
57 if event == "ws_tester_broadcast_demo_trigger":
58 payload = {"demo": True, "requested_at": data.get("requested_at")}
59 await self.broadcast("ws_tester_broadcast_demo", payload)
60 PrintStyle.info("Harness broadcast demo event dispatched")
61 return None
62
63 if event == "ws_tester_request_all":
64 correlation_id = data.get("correlationId")
65 aggregated = await self.dispatch_to_all_sids(
66 "ws_tester_request",
67 {"value": data.get("marker", "aggregate")},
68 correlation_id=correlation_id,
69 )
70 return {"results": aggregated}

Callers

nothing calls this directly

Calls 10

debugMethod · 0.80
dispatch_to_all_sidsMethod · 0.80
errorMethod · 0.45
getMethod · 0.45
broadcastMethod · 0.45
infoMethod · 0.45
warningMethod · 0.45
emit_toMethod · 0.45

Tested by

no test coverage detected