Drive the receive loop until the read stream closes. `task_status.started()` fires once `send_raw_request` is usable. Single-shot: once the loop ends the dispatcher stays closed and cannot be restarted.
(
self,
on_request: OnRequest,
on_notify: OnNotify,
*,
task_status: anyio.abc.TaskStatus[None] = anyio.TASK_STATUS_IGNORED,
)
| 427 | logger.debug("dropped %s: write stream closed", method) |
| 428 | |
| 429 | async def run( |
| 430 | self, |
| 431 | on_request: OnRequest, |
| 432 | on_notify: OnNotify, |
| 433 | *, |
| 434 | task_status: anyio.abc.TaskStatus[None] = anyio.TASK_STATUS_IGNORED, |
| 435 | ) -> None: |
| 436 | """Drive the receive loop until the read stream closes. |
| 437 | |
| 438 | `task_status.started()` fires once `send_raw_request` is usable. |
| 439 | Single-shot: once the loop ends the dispatcher stays closed and cannot be restarted. |
| 440 | """ |
| 441 | try: |
| 442 | # LIFO exits: the write stream closes only after the task-group join, so teardown writes still land. |
| 443 | async with self._write_stream: |
| 444 | async with anyio.create_task_group() as tg: |
| 445 | self._tg = tg |
| 446 | self._running = True |
| 447 | task_status.started() |
| 448 | try: |
| 449 | async with self._read_stream: |
| 450 | try: |
| 451 | async for item in self._read_stream: |
| 452 | # Duck-typed: only `ContextReceiveStream` carries the |
| 453 | # sender's per-message contextvars snapshot. |
| 454 | sender_ctx: contextvars.Context | None = getattr( |
| 455 | self._read_stream, "last_context", None |
| 456 | ) |
| 457 | await self._dispatch(item, on_request, on_notify, sender_ctx) |
| 458 | except anyio.ClosedResourceError: |
| 459 | # Receive end closed under us (stateless SHTTP teardown); same as EOF. |
| 460 | logger.debug("read stream closed by transport; treating as EOF") |
| 461 | # EOF: wake blocked `send_raw_request` waiters with CONNECTION_CLOSED. |
| 462 | self._running = False |
| 463 | self._closed = True |
| 464 | self._fan_out_closed() |
| 465 | finally: |
| 466 | # Cancel in-flight handlers; otherwise the task-group join |
| 467 | # waits on handlers whose callers are already gone. |
| 468 | tg.cancel_scope.cancel() |
| 469 | finally: |
| 470 | # Covers cancel/crash paths that skip the inline fan-out; idempotent. |
| 471 | self._running = False |
| 472 | self._closed = True |
| 473 | self._tg = None |
| 474 | self._fan_out_closed() |
| 475 | await resync_tracer() |
| 476 | |
| 477 | async def _dispatch( |
| 478 | self, |