MCPcopy
hub / github.com/modelcontextprotocol/python-sdk / _drive

Function _drive

tests/server/test_runner.py:128–132  ·  view source on GitHub ↗
(*, task_status: anyio.abc.TaskStatus[None])

Source from the content-addressed store, hash-verified

126 body_exc: BaseException | None = None
127
128 async def _drive(*, task_status: anyio.abc.TaskStatus[None]) -> None:
129 try:
130 await server_d.run(runner.on_request, runner.on_notify, task_status=task_status)
131 finally:
132 await aclose_shielded(connection)
133
134 async with anyio.create_task_group() as tg:
135 await tg.start(client.run, c_req, c_notify)

Callers

nothing calls this directly

Calls 2

aclose_shieldedFunction · 0.90
runMethod · 0.45

Tested by

no test coverage detected