(
browser_name: str, launch_arguments: Dict, wait_queue: "multiprocessing.Queue[str]"
)
| 68 | |
| 69 | |
| 70 | def _test_signals_sync( |
| 71 | browser_name: str, launch_arguments: Dict, wait_queue: "multiprocessing.Queue[str]" |
| 72 | ) -> None: |
| 73 | # On Windows, hint to mypy and pyright that they shouldn't check this function |
| 74 | if sys.platform == "win32": |
| 75 | return |
| 76 | |
| 77 | os.setpgrp() |
| 78 | sigint_received = False |
| 79 | |
| 80 | def my_sig_handler(signum: int, frame: Any) -> None: |
| 81 | nonlocal sigint_received |
| 82 | sigint_received = True |
| 83 | |
| 84 | signal.signal(signal.SIGINT, my_sig_handler) |
| 85 | |
| 86 | playwright = sync_playwright().start() |
| 87 | browser = playwright[browser_name].launch( |
| 88 | **launch_arguments, |
| 89 | handle_sigint=False, |
| 90 | ) |
| 91 | context = browser.new_context() |
| 92 | page = context.new_page() |
| 93 | notified = False |
| 94 | try: |
| 95 | while not sigint_received: |
| 96 | if not notified: |
| 97 | wait_queue.put("ready") |
| 98 | notified = True |
| 99 | page.wait_for_timeout(100) |
| 100 | finally: |
| 101 | wait_queue.put("close context") |
| 102 | context.close() |
| 103 | wait_queue.put("close browser") |
| 104 | browser.close() |
| 105 | wait_queue.put("close playwright") |
| 106 | playwright.stop() |
| 107 | wait_queue.put("all done") |
| 108 | |
| 109 | |
| 110 | def _create_signals_test( |
nothing calls this directly
no test coverage detected