MCPcopy
hub / github.com/modelcontextprotocol/python-sdk / in_process_client_factory

Function in_process_client_factory

tests/shared/test_sse.py:52–72  ·  view source on GitHub ↗

An httpx_client_factory for sse_client whose clients are served in process by `app`.

(app: Starlette)

Source from the content-addressed store, hash-verified

50
51
52def in_process_client_factory(app: Starlette) -> McpHttpClientFactory:
53 """An httpx_client_factory for sse_client whose clients are served in process by `app`."""
54
55 def factory(
56 headers: dict[str, str] | None = None,
57 timeout: httpx.Timeout | None = None,
58 auth: httpx.Auth | None = None,
59 ) -> httpx.AsyncClient:
60 # The SSE GET runs until it observes a disconnect, so the bridge must let the
61 # application drain on close rather than cancelling it. follow_redirects matches
62 # create_mcp_http_client, the factory this one stands in for.
63 return httpx.AsyncClient(
64 transport=StreamingASGITransport(app, cancel_on_close=False),
65 base_url=BASE_URL,
66 headers=headers,
67 timeout=timeout,
68 auth=auth,
69 follow_redirects=True,
70 )
71
72 return factory
73
74
75async def _handle_read_resource(ctx: ServerRequestContext, params: ReadResourceRequestParams) -> ReadResourceResult:

Calls

no outgoing calls

Tested by

no test coverage detected