(event, sid, data)
| 557 | |
| 558 | @socketio_server.on("*", namespace=NAMESPACE) # type: ignore |
| 559 | async def _dispatch(event, sid, data): |
| 560 | incoming = data if isinstance(data, dict) else {} |
| 561 | |
| 562 | try: |
| 563 | with _contexts_lock: |
| 564 | ctx = _ws_contexts.get(sid) |
| 565 | activated = dict(_active_handlers.get(sid, {})) |
| 566 | |
| 567 | correlation_id = incoming.get("correlationId") or uuid.uuid4().hex |
| 568 | |
| 569 | if ctx is None: |
| 570 | return _error_response("AUTH_REQUIRED", |
| 571 | "No security context", correlation_id) |
| 572 | if not activated: |
| 573 | return _error_response("NO_HANDLERS", |
| 574 | "No handlers activated", correlation_id) |
| 575 | |
| 576 | # Pre-filter handlers through security checks |
| 577 | passing_handlers: list[WsHandler] = [] |
| 578 | security_errors: list[dict[str, Any]] = [] |
| 579 | for path, instance in activated.items(): |
| 580 | error = _check_security(type(instance), ctx) |
| 581 | if error is not None: |
| 582 | security_errors.append({ |
| 583 | "handlerId": instance.identifier, |
| 584 | "ok": False, |
| 585 | "correlationId": correlation_id, |
| 586 | "error": error, |
| 587 | }) |
| 588 | else: |
| 589 | passing_handlers.append(instance) |
| 590 | |
| 591 | # Delegate to WsManager for unified processing pipeline |
| 592 | # (worker thread isolation, diagnostic events, WsResult support) |
| 593 | if manager is not None and passing_handlers: |
| 594 | result = await manager.process_client_event( |
| 595 | NAMESPACE, event, incoming, sid, |
| 596 | handlers=passing_handlers, |
| 597 | ) |
| 598 | if security_errors: |
| 599 | result["results"] = security_errors + result.get("results", []) |
| 600 | return result |
| 601 | |
| 602 | # All handlers failed security or no manager — return collected errors |
| 603 | if not passing_handlers: |
| 604 | return {"correlationId": correlation_id, "results": security_errors} |
| 605 | |
| 606 | # Fallback: inline processing (no manager — should not happen in practice) |
| 607 | handler_payload: dict[str, Any] |
| 608 | if "data" in incoming and isinstance(incoming.get("data"), dict): |
| 609 | handler_payload = dict(incoming["data"]) |
| 610 | else: |
| 611 | handler_payload = dict(incoming) |
| 612 | handler_payload["correlationId"] = correlation_id |
| 613 | |
| 614 | results: list[dict[str, Any]] = list(security_errors) |
| 615 | for path, instance in activated.items(): |
| 616 | if instance not in passing_handlers: |
no test coverage detected