A simplified Queue that provides a `put` method that is compatible with the asyncio event loop.
| 449 | |
| 450 | |
| 451 | class _SyncQueue: |
| 452 | """ |
| 453 | A simplified Queue that provides a `put` method that is compatible with the asyncio event loop. |
| 454 | """ |
| 455 | |
| 456 | def __init__(self, |
| 457 | queue: "AsyncQueue", |
| 458 | loop: Optional[asyncio.AbstractEventLoop] = None): |
| 459 | self._aq = queue |
| 460 | self._loop = loop or asyncio.get_event_loop() |
| 461 | |
| 462 | async def _notify(self): |
| 463 | self._aq.notify() |
| 464 | |
| 465 | def put(self, item) -> None: |
| 466 | self._aq.put_nowait(item) |
| 467 | |
| 468 | if self._loop.is_running(): |
| 469 | asyncio.run_coroutine_threadsafe(self._notify(), self._loop) |
| 470 | else: |
| 471 | raise AsyncQueue.EventLoopShutdownError() |
| 472 | |
| 473 | def put_nowait(self, item) -> None: |
| 474 | """ Put item without notify the event. """ |
| 475 | self._aq.put_nowait(item) |
| 476 | |
| 477 | # Notify many queues in one coroutine, to cut down context switch overhead. |
| 478 | @staticmethod |
| 479 | async def _notify_many(queues: Iterable["_SyncQueue"]): |
| 480 | for queue in queues: |
| 481 | queue._aq.notify() |
| 482 | |
| 483 | @staticmethod |
| 484 | def notify_many(loop: asyncio.AbstractEventLoop, |
| 485 | queues: List["_SyncQueue"]) -> None: |
| 486 | """ Notify the events in the loop. """ |
| 487 | |
| 488 | if loop.is_running(): |
| 489 | asyncio.run_coroutine_threadsafe( |
| 490 | _SyncQueue._notify_many(frozenset(queues)), loop) |
| 491 | else: |
| 492 | raise AsyncQueue.EventLoopShutdownError() |
| 493 | |
| 494 | @property |
| 495 | def loop(self) -> asyncio.AbstractEventLoop: |
| 496 | return self._loop |
| 497 | |
| 498 | def full(self) -> bool: |
| 499 | return self._aq.full() |
| 500 | |
| 501 | def get(self, timeout=None): |
| 502 | # Here is the WAR for jupyter scenario where trt-llm detects the event loop existence. |
| 503 | # However, this event loop launched by jupyter rather than trt-llm. It led the GenerationResult initialized |
| 504 | # w/ AsyncQueue and call the get() unintentionally. |
| 505 | |
| 506 | warnings.warn( |
| 507 | "LLM API is running in async mode because you have a running event loop," |
| 508 | " but you are using sync API. This may lead to potential performance loss." |
no outgoing calls
no test coverage detected