| 416 | raise asyncio.QueueEmpty() from None |
| 417 | |
| 418 | async def get(self, timeout=None): |
| 419 | if self._tainted: |
| 420 | raise AsyncQueue.MixedSyncAsyncAPIError() |
| 421 | |
| 422 | # Blocking path: timeout is None (wait indefinitely) |
| 423 | if timeout is None: |
| 424 | # Wait indefinitely until the queue is non-empty. |
| 425 | # It is necessary to check if the queue is empty after waking. |
| 426 | # Because multiple waiting coroutines may be awakened simultaneously when a new item entries empty queue. |
| 427 | # These coroutines will all pop this item from queue, and then raise IndexError. |
| 428 | while not self._q: |
| 429 | await self._event.wait() |
| 430 | # Blocking path: timeout > 0 (timed wait, retry with remaining time). |
| 431 | elif timeout > 0: |
| 432 | # Compute the deadline; if the queue is still empty after waking, continue waiting for the remaining time. |
| 433 | loop = asyncio.get_running_loop() |
| 434 | deadline = loop.time() + timeout |
| 435 | while not self._q: |
| 436 | remaining = deadline - loop.time() |
| 437 | if remaining <= 0: |
| 438 | raise asyncio.TimeoutError() |
| 439 | # This may raise asyncio.TimeoutError. |
| 440 | await asyncio.wait_for(self._event.wait(), timeout=remaining) |
| 441 | # Non-blocking path: timeout <= 0. |
| 442 | elif not self._q: |
| 443 | raise asyncio.QueueEmpty() |
| 444 | |
| 445 | res = self._q.popleft() |
| 446 | if not self._q: |
| 447 | self._event.clear() |
| 448 | return res |
| 449 | |
| 450 | |
| 451 | class _SyncQueue: |