(
self,
coro: Union[Coroutine[Any, Any, Any], Generator[Any, Any, Any]],
)
| 95 | return self._impl_obj.__str__() |
| 96 | |
| 97 | def _sync( |
| 98 | self, |
| 99 | coro: Union[Coroutine[Any, Any, Any], Generator[Any, Any, Any]], |
| 100 | ) -> Any: |
| 101 | __tracebackhide__ = True |
| 102 | if self._loop.is_closed(): |
| 103 | coro.close() |
| 104 | raise Error("Event loop is closed! Is Playwright already stopped?") |
| 105 | |
| 106 | g_self = greenlet.getcurrent() |
| 107 | task: asyncio.tasks.Task[Any] = self._loop.create_task(coro) |
| 108 | setattr(task, "__pw_stack__", inspect.stack(0)) |
| 109 | setattr(task, "__pw_stack_trace__", traceback.extract_stack(limit=10)) |
| 110 | |
| 111 | task.add_done_callback(lambda _: g_self.switch()) |
| 112 | while not task.done(): |
| 113 | self._dispatcher_fiber.switch() |
| 114 | asyncio._set_running_loop(self._loop) |
| 115 | return task.result() |
| 116 | |
| 117 | def _wrap_handler( |
| 118 | self, handler: Union[Callable[..., Any], Any] |
no test coverage detected