Creates a Windows Job Object configured to terminate all its processes when closed.
()
| 172 | |
| 173 | |
| 174 | def _create_job_object() -> object | None: |
| 175 | """Creates a Windows Job Object configured to terminate all its processes when closed.""" |
| 176 | if sys.platform != "win32" or not win32api or not win32job: |
| 177 | return None |
| 178 | |
| 179 | job = None |
| 180 | try: |
| 181 | job = win32job.CreateJobObject(None, "") |
| 182 | extended_info = win32job.QueryInformationJobObject(job, win32job.JobObjectExtendedLimitInformation) |
| 183 | |
| 184 | extended_info["BasicLimitInformation"]["LimitFlags"] |= win32job.JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE |
| 185 | win32job.SetInformationJobObject(job, win32job.JobObjectExtendedLimitInformation, extended_info) |
| 186 | return job |
| 187 | except pywintypes.error: |
| 188 | logger.warning("Failed to create Job Object for process tree management", exc_info=True) |
| 189 | # If creation succeeded but configuration failed, close the handle now. |
| 190 | if job is not None: |
| 191 | _close_job_handle(job) |
| 192 | return None |
| 193 | |
| 194 | |
| 195 | def _maybe_assign_process_to_job(process: Process | FallbackProcess, job: object | None) -> None: |
no test coverage detected