(streams: MessageStream)
| 428 | methods_seen: list[str] = [] |
| 429 | |
| 430 | async def scripted_server(streams: MessageStream) -> None: |
| 431 | server_read, server_write = streams |
| 432 | async for message in server_read: |
| 433 | assert isinstance(message, SessionMessage) |
| 434 | frame = message.message |
| 435 | assert isinstance(frame, types.JSONRPCRequest | types.JSONRPCNotification) |
| 436 | methods_seen.append(frame.method) |
| 437 | if isinstance(frame, types.JSONRPCNotification): |
| 438 | continue |
| 439 | if frame.method == "server/discover": |
| 440 | error = types.ErrorData(code=code, message="nope") |
| 441 | await server_write.send(SessionMessage(types.JSONRPCError(jsonrpc="2.0", id=frame.id, error=error))) |
| 442 | elif frame.method == "initialize": # pragma: no branch |
| 443 | result = types.InitializeResult( |
| 444 | protocol_version=LATEST_HANDSHAKE_VERSION, |
| 445 | capabilities=ServerCapabilities(), |
| 446 | server_info=types.Implementation(name="legacy-only", version="0.0.1"), |
| 447 | ) |
| 448 | await server_write.send( |
| 449 | SessionMessage( |
| 450 | types.JSONRPCResponse( |
| 451 | jsonrpc="2.0", |
| 452 | id=frame.id, |
| 453 | result=result.model_dump(by_alias=True, mode="json", exclude_none=True), |
| 454 | ) |
| 455 | ) |
| 456 | ) |
| 457 | |
| 458 | @asynccontextmanager |
| 459 | async def scripted_transport() -> AsyncIterator[TransportStreams]: |
nothing calls this directly
no test coverage detected