(*, task_status: anyio.abc.TaskStatus[None])
| 126 | body_exc: BaseException | None = None |
| 127 | |
| 128 | async def _drive(*, task_status: anyio.abc.TaskStatus[None]) -> None: |
| 129 | try: |
| 130 | await server_d.run(runner.on_request, runner.on_notify, task_status=task_status) |
| 131 | finally: |
| 132 | await aclose_shielded(connection) |
| 133 | |
| 134 | async with anyio.create_task_group() as tg: |
| 135 | await tg.start(client.run, c_req, c_notify) |
nothing calls this directly
no test coverage detected