MCPcopy
hub / github.com/modelcontextprotocol/python-sdk / scripted_server

Function scripted_server

tests/client/test_client.py:430–456  ·  view source on GitHub ↗
(streams: MessageStream)

Source from the content-addressed store, hash-verified

428 methods_seen: list[str] = []
429
430 async def scripted_server(streams: MessageStream) -> None:
431 server_read, server_write = streams
432 async for message in server_read:
433 assert isinstance(message, SessionMessage)
434 frame = message.message
435 assert isinstance(frame, types.JSONRPCRequest | types.JSONRPCNotification)
436 methods_seen.append(frame.method)
437 if isinstance(frame, types.JSONRPCNotification):
438 continue
439 if frame.method == "server/discover":
440 error = types.ErrorData(code=code, message="nope")
441 await server_write.send(SessionMessage(types.JSONRPCError(jsonrpc="2.0", id=frame.id, error=error)))
442 elif frame.method == "initialize": # pragma: no branch
443 result = types.InitializeResult(
444 protocol_version=LATEST_HANDSHAKE_VERSION,
445 capabilities=ServerCapabilities(),
446 server_info=types.Implementation(name="legacy-only", version="0.0.1"),
447 )
448 await server_write.send(
449 SessionMessage(
450 types.JSONRPCResponse(
451 jsonrpc="2.0",
452 id=frame.id,
453 result=result.model_dump(by_alias=True, mode="json", exclude_none=True),
454 )
455 )
456 )
457
458 @asynccontextmanager
459 async def scripted_transport() -> AsyncIterator[TransportStreams]:

Callers

nothing calls this directly

Calls 3

SessionMessageClass · 0.90
ServerCapabilitiesClass · 0.90
sendMethod · 0.45

Tested by

no test coverage detected