MCPcopy Index your code
hub / github.com/modelcontextprotocol/python-sdk / factory

Function factory

tests/shared/test_sse.py:55–70  ·  view source on GitHub ↗
(
        headers: dict[str, str] | None = None,
        timeout: httpx.Timeout | None = None,
        auth: httpx.Auth | None = None,
    )

Source from the content-addressed store, hash-verified

53 """An httpx_client_factory for sse_client whose clients are served in process by `app`."""
54
55 def factory(
56 headers: dict[str, str] | None = None,
57 timeout: httpx.Timeout | None = None,
58 auth: httpx.Auth | None = None,
59 ) -> httpx.AsyncClient:
60 # The SSE GET runs until it observes a disconnect, so the bridge must let the
61 # application drain on close rather than cancelling it. follow_redirects matches
62 # create_mcp_http_client, the factory this one stands in for.
63 return httpx.AsyncClient(
64 transport=StreamingASGITransport(app, cancel_on_close=False),
65 base_url=BASE_URL,
66 headers=headers,
67 timeout=timeout,
68 auth=auth,
69 follow_redirects=True,
70 )
71
72 return factory
73

Callers 2

running_pairFunction · 0.70

Calls 1

Tested by

no test coverage detected