WebSocket callback communication via queues. Provides methods for real-time bidirectional communication between the server and renderer during callback execution. Uses janus.Queue for outbound messages (serialized with to_json) and queue.Queue for get_props responses, enabling thre
| 34 | |
| 35 | |
| 36 | class DashWebsocketCallback: |
| 37 | """WebSocket callback communication via queues. |
| 38 | |
| 39 | Provides methods for real-time bidirectional communication between |
| 40 | the server and renderer during callback execution. |
| 41 | |
| 42 | Uses janus.Queue for outbound messages (serialized with to_json) and |
| 43 | queue.Queue for get_props responses, enabling thread-safe communication |
| 44 | between worker threads and the main event loop. |
| 45 | |
| 46 | IMPORTANT: For long-running callbacks that use loops (e.g., streaming updates), |
| 47 | you MUST check `ws.is_shutdown` in your loop to detect disconnections: |
| 48 | |
| 49 | @callback(Input('btn', 'n_clicks')) # No Output - uses set_props only |
| 50 | async def long_running(n_clicks): |
| 51 | ws = ctx.websocket |
| 52 | while True: |
| 53 | if ws and ws.is_shutdown: |
| 54 | raise PreventUpdate # Exit gracefully on disconnect |
| 55 | set_props('progress', {'value': get_data()}) |
| 56 | await asyncio.sleep(0.1) |
| 57 | |
| 58 | Without this check, callbacks will continue running after the client disconnects, |
| 59 | wasting server resources. |
| 60 | |
| 61 | Note: Only "persistent callbacks" (callbacks with no Output and no Input that use |
| 62 | only set_props) are automatically restarted when the WebSocket reconnects. Regular |
| 63 | callbacks with outputs are not restarted. |
| 64 | """ |
| 65 | |
| 66 | def __init__( |
| 67 | self, |
| 68 | pending_get_props: Dict[str, queue.Queue[Any]], |
| 69 | renderer_id: str, |
| 70 | outbound_queue: janus.Queue[str], |
| 71 | shutdown_event: "threading.Event", |
| 72 | ): |
| 73 | """Initialize the WebSocket callback interface. |
| 74 | |
| 75 | Args: |
| 76 | pending_get_props: Dict to track pending get_props requests. |
| 77 | Values are queue.Queue instances for blocking response retrieval. |
| 78 | renderer_id: The renderer ID for routing messages back to the correct client |
| 79 | outbound_queue: janus.Queue for thread-safe outbound messaging. |
| 80 | shutdown_event: Event signaling the websocket connection has closed. |
| 81 | """ |
| 82 | self._pending_get_props = pending_get_props |
| 83 | self._renderer_id = renderer_id |
| 84 | self._outbound_queue = outbound_queue |
| 85 | self._shutdown_event = shutdown_event |
| 86 | |
| 87 | @property |
| 88 | def is_shutdown(self) -> bool: |
| 89 | """Check if the websocket connection has been shut down.""" |
| 90 | return self._shutdown_event.is_set() |
| 91 | |
| 92 | def _get_outbound_queue(self) -> janus.Queue[str] | None: |
| 93 | """Get the outbound queue.""" |
no outgoing calls
no test coverage detected
searching dependent graphs…