An httpx_client_factory for sse_client whose clients are served in process by `app`.
(app: Starlette)
| 50 | |
| 51 | |
| 52 | def in_process_client_factory(app: Starlette) -> McpHttpClientFactory: |
| 53 | """An httpx_client_factory for sse_client whose clients are served in process by `app`.""" |
| 54 | |
| 55 | def factory( |
| 56 | headers: dict[str, str] | None = None, |
| 57 | timeout: httpx.Timeout | None = None, |
| 58 | auth: httpx.Auth | None = None, |
| 59 | ) -> httpx.AsyncClient: |
| 60 | # The SSE GET runs until it observes a disconnect, so the bridge must let the |
| 61 | # application drain on close rather than cancelling it. follow_redirects matches |
| 62 | # create_mcp_http_client, the factory this one stands in for. |
| 63 | return httpx.AsyncClient( |
| 64 | transport=StreamingASGITransport(app, cancel_on_close=False), |
| 65 | base_url=BASE_URL, |
| 66 | headers=headers, |
| 67 | timeout=timeout, |
| 68 | auth=auth, |
| 69 | follow_redirects=True, |
| 70 | ) |
| 71 | |
| 72 | return factory |
| 73 | |
| 74 | |
| 75 | async def _handle_read_resource(ctx: ServerRequestContext, params: ReadResourceRequestParams) -> ReadResourceResult: |
no outgoing calls
no test coverage detected