Process a single WebSocket message from the queue. Args: q: The async queue to read from send_text: Async function to send text data over WebSocket messages: List to accumulate messages for batching (mutated in place) batch_delay: Batch delay in seconds Retu
(
q: "janus._AsyncQueueProxy[str]",
send_text: Callable[[str], Any],
messages: list[str],
batch_delay: float,
)
| 254 | |
| 255 | |
| 256 | async def _process_ws_message( |
| 257 | q: "janus._AsyncQueueProxy[str]", |
| 258 | send_text: Callable[[str], Any], |
| 259 | messages: list[str], |
| 260 | batch_delay: float, |
| 261 | ) -> bool: |
| 262 | """Process a single WebSocket message from the queue. |
| 263 | |
| 264 | Args: |
| 265 | q: The async queue to read from |
| 266 | send_text: Async function to send text data over WebSocket |
| 267 | messages: List to accumulate messages for batching (mutated in place) |
| 268 | batch_delay: Batch delay in seconds |
| 269 | |
| 270 | Returns: |
| 271 | True to continue processing, False to stop the sender loop. |
| 272 | """ |
| 273 | timeout = batch_delay if messages else None |
| 274 | try: |
| 275 | msg = await asyncio.wait_for(q.get(), timeout=timeout) |
| 276 | except asyncio.TimeoutError: |
| 277 | success = await _send_batched(send_text, messages) |
| 278 | messages.clear() |
| 279 | return success |
| 280 | |
| 281 | if msg == SHUTDOWN_SIGNAL: |
| 282 | if messages: |
| 283 | await _send_batched(send_text, messages) |
| 284 | return False |
| 285 | |
| 286 | if msg == FLUSH_SIGNAL: |
| 287 | success = not messages or await _send_batched(send_text, messages) |
| 288 | messages.clear() |
| 289 | return success |
| 290 | |
| 291 | if not batch_delay: |
| 292 | try: |
| 293 | await send_text(msg) |
| 294 | except Exception: # pylint: disable=broad-exception-caught |
| 295 | return False # WebSocketDisconnect, RuntimeError, etc. |
| 296 | else: |
| 297 | messages.append(msg) |
| 298 | |
| 299 | return True |
| 300 | |
| 301 | |
| 302 | async def _send_batched(send_text: Callable[[str], Any], messages: list) -> bool: |
no test coverage detected
searching dependent graphs…