MCPcopy
hub / github.com/NVIDIA/TensorRT-LLM / AsyncQueue

Class AsyncQueue

tensorrt_llm/llmapi/utils.py:364–448  ·  view source on GitHub ↗

A queue-style container that provides both sync and async interface. This is used to provide a compatible interface for janus.Queue.

Source from the content-addressed store, hash-verified

362
363
364class AsyncQueue:
365 """
366 A queue-style container that provides both sync and async interface.
367 This is used to provide a compatible interface for janus.Queue.
368 """
369
370 class EventLoopShutdownError(Exception):
371 pass
372
373 class MixedSyncAsyncAPIError(Exception):
374 pass
375
376 def __init__(self):
377 self._q = collections.deque()
378 self._event = asyncio.Event()
379 self._tainted = False
380 self._sync_q = _SyncQueue(self)
381
382 @property
383 def sync_q(self):
384 return self._sync_q
385
386 def full(self) -> bool:
387 return len(self._q) == self._q.maxlen
388
389 def empty(self) -> bool:
390 return not self._q
391
392 def put(self, item) -> None:
393 self._q.append(item)
394 self._event.set()
395
396 # Decoupled put and notify.
397 # Deque is thread safe so we can put from outside the event loop.
398 # However, we have to schedule notify in event loop because it's not thread safe.
399 # In this case the notify part may get scheduled late, to the point that
400 # corresponding data in deque may have already been consumed.
401 # Avoid firing the event in this case.
402 def put_nowait(self, item) -> None:
403 self._q.append(item)
404
405 def notify(self) -> None:
406 if self._q:
407 self._event.set()
408
409 def unsafe_get(self):
410 # Unsafe get taints the queue, renders it unusable by async methods.
411 self._tainted = True
412 # Use exception to detect empty. Pre-check is not thread safe.
413 try:
414 return self._q.popleft()
415 except IndexError:
416 raise asyncio.QueueEmpty() from None
417
418 async def get(self, timeout=None):
419 if self._tainted:
420 raise AsyncQueue.MixedSyncAsyncAPIError()
421

Callers 4

test_AsyncQueueFunction · 0.90
__init__Method · 0.85
__init__Method · 0.85
_call_streamingMethod · 0.85

Calls

no outgoing calls

Tested by 1

test_AsyncQueueFunction · 0.72