MCPcopy
hub / github.com/agent0ai/agent-zero / _dispatch

Function _dispatch

helpers/ws.py:559–645  ·  view source on GitHub ↗
(event, sid, data)

Source from the content-addressed store, hash-verified

557
558 @socketio_server.on("*", namespace=NAMESPACE) # type: ignore
559 async def _dispatch(event, sid, data):
560 incoming = data if isinstance(data, dict) else {}
561
562 try:
563 with _contexts_lock:
564 ctx = _ws_contexts.get(sid)
565 activated = dict(_active_handlers.get(sid, {}))
566
567 correlation_id = incoming.get("correlationId") or uuid.uuid4().hex
568
569 if ctx is None:
570 return _error_response("AUTH_REQUIRED",
571 "No security context", correlation_id)
572 if not activated:
573 return _error_response("NO_HANDLERS",
574 "No handlers activated", correlation_id)
575
576 # Pre-filter handlers through security checks
577 passing_handlers: list[WsHandler] = []
578 security_errors: list[dict[str, Any]] = []
579 for path, instance in activated.items():
580 error = _check_security(type(instance), ctx)
581 if error is not None:
582 security_errors.append({
583 "handlerId": instance.identifier,
584 "ok": False,
585 "correlationId": correlation_id,
586 "error": error,
587 })
588 else:
589 passing_handlers.append(instance)
590
591 # Delegate to WsManager for unified processing pipeline
592 # (worker thread isolation, diagnostic events, WsResult support)
593 if manager is not None and passing_handlers:
594 result = await manager.process_client_event(
595 NAMESPACE, event, incoming, sid,
596 handlers=passing_handlers,
597 )
598 if security_errors:
599 result["results"] = security_errors + result.get("results", [])
600 return result
601
602 # All handlers failed security or no manager — return collected errors
603 if not passing_handlers:
604 return {"correlationId": correlation_id, "results": security_errors}
605
606 # Fallback: inline processing (no manager — should not happen in practice)
607 handler_payload: dict[str, Any]
608 if "data" in incoming and isinstance(incoming.get("data"), dict):
609 handler_payload = dict(incoming["data"])
610 else:
611 handler_payload = dict(incoming)
612 handler_payload["correlationId"] = correlation_id
613
614 results: list[dict[str, Any]] = list(security_errors)
615 for path, instance in activated.items():
616 if instance not in passing_handlers:

Callers 1

_invoke_for_sidMethod · 0.70

Calls 8

format_errorFunction · 0.90
_error_responseFunction · 0.85
_check_securityFunction · 0.85
typeFunction · 0.85
process_client_eventMethod · 0.80
getMethod · 0.45
processMethod · 0.45
errorMethod · 0.45

Tested by

no test coverage detected