(
request: Request, call_next: Callable[[Request], Awaitable[Any]]
)
| 829 | |
| 830 | @app.middleware("http") |
| 831 | async def context_propagation( |
| 832 | request: Request, call_next: Callable[[Request], Awaitable[Any]] |
| 833 | ) -> Any: |
| 834 | ctx = get_propagated_context(request) |
| 835 | token = context.attach(ctx) |
| 836 | try: |
| 837 | response = await call_next(request) |
| 838 | return response |
| 839 | finally: |
| 840 | context.detach(token) |
| 841 | |
| 842 | @app.post( |
| 843 | "/api/reasoning_engine", |
nothing calls this directly
no test coverage detected