(
self, cb: Callable[[], Any], is_internal: bool = False, title: str = None
)
| 565 | self._api_zone.set(None) |
| 566 | |
| 567 | def wrap_api_call_sync( |
| 568 | self, cb: Callable[[], Any], is_internal: bool = False, title: str = None |
| 569 | ) -> Any: |
| 570 | if self._api_zone.get(): |
| 571 | return cb() |
| 572 | task = asyncio.current_task(self._loop) |
| 573 | st: List[inspect.FrameInfo] = getattr( |
| 574 | task, "__pw_stack__", None |
| 575 | ) or inspect.stack(0) |
| 576 | parsed_st = _extract_stack_trace_information_from_stack(st, is_internal, title) |
| 577 | self._api_zone.set(parsed_st) |
| 578 | try: |
| 579 | return cb() |
| 580 | except Exception as error: |
| 581 | raise rewrite_error(error, f"{parsed_st['apiName']}: {error}") from None |
| 582 | finally: |
| 583 | self._api_zone.set(None) |
| 584 | |
| 585 | |
| 586 | def from_channel(channel: Channel) -> Any: |
no test coverage detected