(self, func: Callable)
| 1593 | super().__init__(parent, type, guid, initializer) |
| 1594 | |
| 1595 | async def call(self, func: Callable) -> None: |
| 1596 | try: |
| 1597 | frame = from_channel(self._initializer["frame"]) |
| 1598 | source = dict(context=frame._page.context, page=frame._page, frame=frame) |
| 1599 | if self._initializer.get("handle"): |
| 1600 | result = func(source, from_channel(self._initializer["handle"])) |
| 1601 | else: |
| 1602 | func_args = list(map(parse_result, self._initializer["args"])) |
| 1603 | result = func(source, *func_args) |
| 1604 | if inspect.iscoroutine(result): |
| 1605 | result = await result |
| 1606 | await self._channel.send( |
| 1607 | "resolve", None, dict(result=serialize_argument(result)) |
| 1608 | ) |
| 1609 | except Exception as e: |
| 1610 | tb = sys.exc_info()[2] |
| 1611 | asyncio.create_task( |
| 1612 | self._channel.send( |
| 1613 | "reject", None, dict(error=dict(error=serialize_error(e, tb))) |
| 1614 | ) |
| 1615 | ) |
| 1616 | |
| 1617 | |
| 1618 | def trim_url(param: Union[URLMatchRequest, URLMatchResponse]) -> Optional[str]: |
no test coverage detected