(
*args: P.args, **kwargs: P.kwargs
)
| 514 | @asynccontextmanager |
| 515 | @wraps(fn) |
| 516 | async def context_manager( |
| 517 | *args: P.args, **kwargs: P.kwargs |
| 518 | ) -> AsyncGenerator[trio._channel.RecvChanWrapper[T], None]: |
| 519 | send_chan, recv_chan = trio.open_memory_channel[T](0) |
| 520 | try: |
| 521 | async with trio.open_nursery(strict_exception_groups=True) as nursery: |
| 522 | agen = fn(*args, **kwargs) |
| 523 | send_semaphore = trio.Semaphore(0) |
| 524 | # `nursery.start` to make sure that we will clean up send_chan & agen |
| 525 | # If this errors we don't close `recv_chan`, but the caller |
| 526 | # never gets access to it, so that's not a problem. |
| 527 | await nursery.start( |
| 528 | _move_elems_to_channel, agen, send_chan, send_semaphore |
| 529 | ) |
| 530 | # `async with recv_chan` could eat exceptions, so use sync cm |
| 531 | with RecvChanWrapper(recv_chan, send_semaphore) as wrapped_recv_chan: |
| 532 | yield wrapped_recv_chan |
| 533 | # User has exited context manager, cancel to immediately close the |
| 534 | # abandoned generator if it's still alive. |
| 535 | nursery.cancel_scope.cancel( |
| 536 | "exited trio.as_safe_channel context manager" |
| 537 | ) |
| 538 | except BaseExceptionGroup as eg: |
| 539 | try: |
| 540 | raise_single_exception_from_group(eg) |
| 541 | except MultipleExceptionError: |
| 542 | # In case user has except* we make it possible for them to handle the |
| 543 | # exceptions. |
| 544 | if sys.version_info >= (3, 11): |
| 545 | eg.add_note( |
| 546 | "Encountered exception during cleanup of generator object, as " |
| 547 | "well as exception in the contextmanager body - unable to unwrap." |
| 548 | ) |
| 549 | |
| 550 | raise eg from None |
| 551 | |
| 552 | async def _move_elems_to_channel( |
| 553 | agen: AsyncGenerator[T, None], |
nothing calls this directly
no test coverage detected
searching dependent graphs…