Cancel the root task if still running, then await it. The task may still be running if the caller stopped iterating early (e.g., break in async for). In that case we must cancel to avoid a leaked task.
(
self, task: asyncio.Task, node_name: str
)
| 797 | processed_signal.set() |
| 798 | |
| 799 | async def _cleanup_root_task( |
| 800 | self, task: asyncio.Task, node_name: str |
| 801 | ) -> None: |
| 802 | """Cancel the root task if still running, then await it. |
| 803 | |
| 804 | The task may still be running if the caller stopped iterating |
| 805 | early (e.g., break in async for). In that case we must cancel |
| 806 | to avoid a leaked task. |
| 807 | """ |
| 808 | if not task.done(): |
| 809 | logger.debug( |
| 810 | 'Cancelling root node %s (caller stopped early).', |
| 811 | node_name, |
| 812 | ) |
| 813 | task.cancel() |
| 814 | try: |
| 815 | await task |
| 816 | except asyncio.CancelledError: |
| 817 | logger.warning('Root node %s was cancelled.', node_name) |
| 818 | except Exception: |
| 819 | logger.error('Root node %s failed.', node_name, exc_info=True) |
| 820 | raise |
| 821 | |
| 822 | async def _get_or_create_session( |
| 823 | self, |
no test coverage detected