Sender coroutine - drains queue and sends to WebSocket. This coroutine runs in the main event loop and handles sending messages that are queued by worker threads via janus.Queue. Messages are pre-serialized strings (using to_json). For efficiency, this function batches messages tha
(
send_text: Callable[[str], Any],
outbound_queue: janus.Queue[str],
batch_delay: float = 0.005,
)
| 222 | |
| 223 | |
| 224 | async def run_ws_sender( |
| 225 | send_text: Callable[[str], Any], |
| 226 | outbound_queue: janus.Queue[str], |
| 227 | batch_delay: float = 0.005, |
| 228 | ) -> None: |
| 229 | """Sender coroutine - drains queue and sends to WebSocket. |
| 230 | |
| 231 | This coroutine runs in the main event loop and handles sending |
| 232 | messages that are queued by worker threads via janus.Queue. |
| 233 | |
| 234 | Messages are pre-serialized strings (using to_json). For efficiency, |
| 235 | this function batches messages that arrive within batch_delay of each |
| 236 | other, sending them as a JSON array. When no message arrives within |
| 237 | the window, all collected messages are sent immediately. |
| 238 | |
| 239 | Args: |
| 240 | send_text: Async function to send text data over WebSocket |
| 241 | outbound_queue: janus.Queue instance for receiving messages (strings) |
| 242 | batch_delay: Time in seconds to wait for additional messages (default: 5ms). |
| 243 | Set to 0 to disable batching and send messages immediately. |
| 244 | """ |
| 245 | q = outbound_queue.async_q |
| 246 | messages: list[str] = [] |
| 247 | try: |
| 248 | while True: |
| 249 | result = await _process_ws_message(q, send_text, messages, batch_delay) |
| 250 | if result is False: |
| 251 | return |
| 252 | except asyncio.CancelledError: |
| 253 | pass |
| 254 | |
| 255 | |
| 256 | async def _process_ws_message( |
no test coverage detected
searching dependent graphs…