()
| 522 | |
| 523 | @self.server.websocket(ws_path) |
| 524 | async def websocket_handler(): # pylint: disable=too-many-branches |
| 525 | ws = websocket |
| 526 | |
| 527 | # Validate Origin header |
| 528 | error = self._validate_ws_origin( |
| 529 | ws.headers.get("origin"), ws.headers.get("host"), allowed_origins |
| 530 | ) |
| 531 | if error: |
| 532 | await ws.close(code=4003, reason=error) |
| 533 | return |
| 534 | |
| 535 | # Call websocket_connect hooks |
| 536 | # pylint: disable=protected-access |
| 537 | rejection = await self._run_ws_hooks( |
| 538 | dash_app._hooks.get_hooks("websocket_connect"), |
| 539 | ws, |
| 540 | default_reason="Connection rejected", |
| 541 | ) |
| 542 | if rejection: |
| 543 | await ws.close(code=rejection[0], reason=rejection[1]) |
| 544 | return |
| 545 | |
| 546 | await ws.accept() |
| 547 | |
| 548 | # Track this connection for graceful shutdown |
| 549 | try: |
| 550 | ws_obj = ws._get_current_object() |
| 551 | self._active_websockets.add(ws_obj) |
| 552 | except AttributeError: |
| 553 | ws_obj = ws |
| 554 | self._active_websockets.add(ws) |
| 555 | |
| 556 | # Create janus queue for outbound messages (main loop context) |
| 557 | outbound_queue: janus.Queue[str] = janus.Queue() |
| 558 | # Track pending get_props requests with standard queue.Queue for responses |
| 559 | pending_get_props: Dict[str, queue.Queue] = {} |
| 560 | # Shutdown event to signal connection closure to worker threads |
| 561 | connection_shutdown_event = threading.Event() |
| 562 | # Get thread pool executor |
| 563 | executor = self.get_callback_executor() |
| 564 | # Track pending callback futures |
| 565 | pending_callbacks: Dict[str, concurrent.futures.Future] = {} |
| 566 | |
| 567 | # Start sender task to drain outbound queue (sends pre-serialized text) |
| 568 | # pylint: disable=protected-access |
| 569 | batch_delay = getattr(dash_app, "_websocket_batch_delay", 0.005) |
| 570 | sender_task = asyncio.create_task( |
| 571 | run_ws_sender(ws.send, outbound_queue, batch_delay) |
| 572 | ) |
| 573 | |
| 574 | try: |
| 575 | shutdown_event = self._ws_shutdown_event |
| 576 | while shutdown_event is None or not shutdown_event.is_set(): |
| 577 | try: |
| 578 | # Use timeout to periodically check shutdown event |
| 579 | message = await asyncio.wait_for(ws.receive_json(), timeout=1.0) |
| 580 | except asyncio.TimeoutError: |
| 581 | # Re-check shutdown event (may have been set during run()) |
nothing calls this directly
no test coverage detected