(
self, cb: Callable[[], Any], is_internal: bool = False, title: str = None
)
| 546 | return payload |
| 547 | |
| 548 | async def wrap_api_call( |
| 549 | self, cb: Callable[[], Any], is_internal: bool = False, title: str = None |
| 550 | ) -> Any: |
| 551 | if self._api_zone.get(): |
| 552 | return await cb() |
| 553 | task = asyncio.current_task(self._loop) |
| 554 | st: List[inspect.FrameInfo] = getattr( |
| 555 | task, "__pw_stack__", None |
| 556 | ) or inspect.stack(0) |
| 557 | |
| 558 | parsed_st = _extract_stack_trace_information_from_stack(st, is_internal, title) |
| 559 | self._api_zone.set(parsed_st) |
| 560 | try: |
| 561 | return await cb() |
| 562 | except Exception as error: |
| 563 | raise rewrite_error(error, f"{parsed_st['apiName']}: {error}") from None |
| 564 | finally: |
| 565 | self._api_zone.set(None) |
| 566 | |
| 567 | def wrap_api_call_sync( |
| 568 | self, cb: Callable[[], Any], is_internal: bool = False, title: str = None |
no test coverage detected