MCPcopy
hub / github.com/modelcontextprotocol/python-sdk / raw_client_session

Function raw_client_session

tests/client/test_session.py:54–70  ·  view source on GitHub ↗

Yield `(session, send_to_client, recv_from_client)` with the receive loop running. `send_to_client` accepts `SessionMessage | Exception` so tests can inject transport-level exceptions. No initialize handshake is performed.

(
    **kwargs: Any,
)

Source from the content-addressed store, hash-verified

52
53@asynccontextmanager
54async def raw_client_session(
55 **kwargs: Any,
56) -> AsyncIterator[tuple[ClientSession, _SendToClient, _RecvFromClient]]:
57 """Yield `(session, send_to_client, recv_from_client)` with the receive loop running.
58
59 `send_to_client` accepts `SessionMessage | Exception` so tests can inject
60 transport-level exceptions. No initialize handshake is performed.
61 """
62 s2c_send, s2c_recv = anyio.create_memory_object_stream[SessionMessage | Exception](32)
63 c2s_send, c2s_recv = anyio.create_memory_object_stream[SessionMessage](32)
64 async with ClientSession(s2c_recv, c2s_send, **kwargs) as session:
65 try:
66 with anyio.fail_after(5):
67 yield session, s2c_send, c2s_recv
68 finally:
69 s2c_send.close()
70 c2s_recv.close()
71
72
73@pytest.mark.anyio

Calls 2

ClientSessionClass · 0.90
closeMethod · 0.45

Tested by

no test coverage detected