MCPcopy
hub / github.com/NVIDIA/TensorRT-LLM / _SyncQueue

Class _SyncQueue

tensorrt_llm/llmapi/utils.py:451–518  ·  view source on GitHub ↗

A simplified Queue that provides a `put` method that is compatible with the asyncio event loop.

Source from the content-addressed store, hash-verified

449
450
451class _SyncQueue:
452 """
453 A simplified Queue that provides a `put` method that is compatible with the asyncio event loop.
454 """
455
456 def __init__(self,
457 queue: "AsyncQueue",
458 loop: Optional[asyncio.AbstractEventLoop] = None):
459 self._aq = queue
460 self._loop = loop or asyncio.get_event_loop()
461
462 async def _notify(self):
463 self._aq.notify()
464
465 def put(self, item) -> None:
466 self._aq.put_nowait(item)
467
468 if self._loop.is_running():
469 asyncio.run_coroutine_threadsafe(self._notify(), self._loop)
470 else:
471 raise AsyncQueue.EventLoopShutdownError()
472
473 def put_nowait(self, item) -> None:
474 """ Put item without notify the event. """
475 self._aq.put_nowait(item)
476
477 # Notify many queues in one coroutine, to cut down context switch overhead.
478 @staticmethod
479 async def _notify_many(queues: Iterable["_SyncQueue"]):
480 for queue in queues:
481 queue._aq.notify()
482
483 @staticmethod
484 def notify_many(loop: asyncio.AbstractEventLoop,
485 queues: List["_SyncQueue"]) -> None:
486 """ Notify the events in the loop. """
487
488 if loop.is_running():
489 asyncio.run_coroutine_threadsafe(
490 _SyncQueue._notify_many(frozenset(queues)), loop)
491 else:
492 raise AsyncQueue.EventLoopShutdownError()
493
494 @property
495 def loop(self) -> asyncio.AbstractEventLoop:
496 return self._loop
497
498 def full(self) -> bool:
499 return self._aq.full()
500
501 def get(self, timeout=None):
502 # Here is the WAR for jupyter scenario where trt-llm detects the event loop existence.
503 # However, this event loop launched by jupyter rather than trt-llm. It led the GenerationResult initialized
504 # w/ AsyncQueue and call the get() unintentionally.
505
506 warnings.warn(
507 "LLM API is running in async mode because you have a running event loop,"
508 " but you are using sync API. This may lead to potential performance loss."

Callers 2

__init__Method · 0.85
_call_streamingMethod · 0.85

Calls

no outgoing calls

Tested by

no test coverage detected