Yield `(session, send_to_client, recv_from_client)` with the receive loop running. `send_to_client` accepts `SessionMessage | Exception` so tests can inject transport-level exceptions. No initialize handshake is performed.
(
**kwargs: Any,
)
| 52 | |
| 53 | @asynccontextmanager |
| 54 | async def raw_client_session( |
| 55 | **kwargs: Any, |
| 56 | ) -> AsyncIterator[tuple[ClientSession, _SendToClient, _RecvFromClient]]: |
| 57 | """Yield `(session, send_to_client, recv_from_client)` with the receive loop running. |
| 58 | |
| 59 | `send_to_client` accepts `SessionMessage | Exception` so tests can inject |
| 60 | transport-level exceptions. No initialize handshake is performed. |
| 61 | """ |
| 62 | s2c_send, s2c_recv = anyio.create_memory_object_stream[SessionMessage | Exception](32) |
| 63 | c2s_send, c2s_recv = anyio.create_memory_object_stream[SessionMessage](32) |
| 64 | async with ClientSession(s2c_recv, c2s_send, **kwargs) as session: |
| 65 | try: |
| 66 | with anyio.fail_after(5): |
| 67 | yield session, s2c_send, c2s_recv |
| 68 | finally: |
| 69 | s2c_send.close() |
| 70 | c2s_recv.close() |
| 71 | |
| 72 | |
| 73 | @pytest.mark.anyio |
no test coverage detected