Set up the WebSocket endpoint for callback handling. Uses thread pool executor for callback execution with janus queues for async/sync communication between main loop and worker threads. Args: dash_app: The Dash application instance
(self, dash_app: "Dash")
| 507 | return None |
| 508 | |
| 509 | def serve_websocket_callback(self, dash_app: "Dash"): |
| 510 | """Set up the WebSocket endpoint for callback handling. |
| 511 | |
| 512 | Uses thread pool executor for callback execution with janus queues |
| 513 | for async/sync communication between main loop and worker threads. |
| 514 | |
| 515 | Args: |
| 516 | dash_app: The Dash application instance |
| 517 | """ |
| 518 | # pylint: disable=too-many-statements,too-many-locals |
| 519 | ws_path = dash_app.config.routes_pathname_prefix + "_dash-ws-callback" |
| 520 | # pylint: disable=protected-access |
| 521 | allowed_origins = getattr(dash_app, "_websocket_allowed_origins", []) |
| 522 | |
| 523 | @self.server.websocket(ws_path) |
| 524 | async def websocket_handler(): # pylint: disable=too-many-branches |
| 525 | ws = websocket |
| 526 | |
| 527 | # Validate Origin header |
| 528 | error = self._validate_ws_origin( |
| 529 | ws.headers.get("origin"), ws.headers.get("host"), allowed_origins |
| 530 | ) |
| 531 | if error: |
| 532 | await ws.close(code=4003, reason=error) |
| 533 | return |
| 534 | |
| 535 | # Call websocket_connect hooks |
| 536 | # pylint: disable=protected-access |
| 537 | rejection = await self._run_ws_hooks( |
| 538 | dash_app._hooks.get_hooks("websocket_connect"), |
| 539 | ws, |
| 540 | default_reason="Connection rejected", |
| 541 | ) |
| 542 | if rejection: |
| 543 | await ws.close(code=rejection[0], reason=rejection[1]) |
| 544 | return |
| 545 | |
| 546 | await ws.accept() |
| 547 | |
| 548 | # Track this connection for graceful shutdown |
| 549 | try: |
| 550 | ws_obj = ws._get_current_object() |
| 551 | self._active_websockets.add(ws_obj) |
| 552 | except AttributeError: |
| 553 | ws_obj = ws |
| 554 | self._active_websockets.add(ws) |
| 555 | |
| 556 | # Create janus queue for outbound messages (main loop context) |
| 557 | outbound_queue: janus.Queue[str] = janus.Queue() |
| 558 | # Track pending get_props requests with standard queue.Queue for responses |
| 559 | pending_get_props: Dict[str, queue.Queue] = {} |
| 560 | # Shutdown event to signal connection closure to worker threads |
| 561 | connection_shutdown_event = threading.Event() |
| 562 | # Get thread pool executor |
| 563 | executor = self.get_callback_executor() |
| 564 | # Track pending callback futures |
| 565 | pending_callbacks: Dict[str, concurrent.futures.Future] = {} |
| 566 |