(self, async_iter, prog_bar, metrics)
| 77 | return stop_token_ids |
| 78 | |
| 79 | def async_iter_to_iter(self, async_iter, prog_bar, metrics) -> Iterator: |
| 80 | queue = Queue() |
| 81 | |
| 82 | async def _run_async_iter(): |
| 83 | try: |
| 84 | async for item in await async_iter: |
| 85 | queue.put(item) |
| 86 | except Exception as e: |
| 87 | if getattr(self, 'strict', True): |
| 88 | raise |
| 89 | queue.put(e) |
| 90 | else: |
| 91 | queue.put(None) |
| 92 | |
| 93 | try: |
| 94 | loop = asyncio.get_event_loop() |
| 95 | except RuntimeError: |
| 96 | loop = asyncio.new_event_loop() |
| 97 | asyncio.set_event_loop(loop) |
| 98 | thread = Thread(target=lambda: loop.run_until_complete(_run_async_iter())) |
| 99 | thread.start() |
| 100 | pre_output = None |
| 101 | while True: |
| 102 | output = queue.get() |
| 103 | if output is None or isinstance(output, Exception): |
| 104 | prog_bar.update() |
| 105 | self._update_metrics(pre_output, metrics) |
| 106 | return |
| 107 | pre_output = output |
| 108 | yield output |
| 109 | |
| 110 | @staticmethod |
| 111 | async def batch_run(tasks): |
no test coverage detected