MCPcopy
hub / github.com/authlib/authlib / AsyncPathMapDispatch

Class AsyncPathMapDispatch

tests/clients/asgi_helper.py:38–67  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

36
37
38class AsyncPathMapDispatch:
39 def __init__(self, path_maps, side_effects=None):
40 self.path_maps = path_maps
41 self.side_effects = side_effects or dict()
42
43 async def __call__(self, scope, receive, send):
44 request = ASGIRequest(scope, receive=receive)
45
46 side_effect = self.side_effects.get(request.url.path)
47 if side_effect is not None:
48 side_effect(request)
49
50 rv = self.path_maps[request.url.path]
51 status_code = rv.get("status_code", 200)
52 body = rv.get("body")
53 headers = rv.get("headers", {})
54 if isinstance(body, dict):
55 body = json.dumps(body).encode()
56 headers["Content-Type"] = "application/json"
57 else:
58 if isinstance(body, str):
59 body = body.encode()
60 headers["Content-Type"] = "application/x-www-form-urlencoded"
61
62 response = ASGIResponse(
63 status_code=status_code,
64 content=body,
65 headers=headers,
66 )
67 await response(scope, receive, send)

Calls

no outgoing calls

Used in the wild real call sites across dependent graphs

searching dependent graphs…