Two `JSONRPCDispatcher`s wired over crossed in-memory streams.
(*, can_send_request: bool = True)
| 31 | |
| 32 | |
| 33 | def jsonrpc_pair(*, can_send_request: bool = True) -> DispatcherTriple: |
| 34 | """Two `JSONRPCDispatcher`s wired over crossed in-memory streams.""" |
| 35 | c2s_send, c2s_recv = anyio.create_memory_object_stream[SessionMessage | Exception](32) |
| 36 | s2c_send, s2c_recv = anyio.create_memory_object_stream[SessionMessage | Exception](32) |
| 37 | |
| 38 | def builder(_meta: object) -> TransportContext: |
| 39 | return TransportContext(kind="jsonrpc", can_send_request=can_send_request) |
| 40 | |
| 41 | client: JSONRPCDispatcher[TransportContext] = JSONRPCDispatcher(s2c_recv, c2s_send, transport_builder=builder) |
| 42 | server: JSONRPCDispatcher[TransportContext] = JSONRPCDispatcher(c2s_recv, s2c_send, transport_builder=builder) |
| 43 | |
| 44 | def close() -> None: |
| 45 | for s in (c2s_send, c2s_recv, s2c_send, s2c_recv): |
| 46 | s.close() |
| 47 | |
| 48 | return client, server, close |
| 49 | |
| 50 | |
| 51 | @pytest.fixture( |
no test coverage detected