(self)
| 143 | |
| 144 | @property |
| 145 | def _active_pid(self) -> Optional[int]: |
| 146 | from dvc_task.proc.process import ProcessInfo |
| 147 | |
| 148 | assert self._EXEC_NAME |
| 149 | name = self._EXEC_NAME |
| 150 | try: |
| 151 | proc_info = ProcessInfo.load(self._proc_info_path(name)) |
| 152 | pid = proc_info.pid |
| 153 | if psutil.pid_exists(pid): |
| 154 | return pid |
| 155 | logger.debug("Workspace exec PID '%d' no longer exists, removing.", pid) |
| 156 | remove(self._proc_info_path(name)) |
| 157 | except (FileNotFoundError, json.JSONDecodeError): |
| 158 | pass |
| 159 | return None |
| 160 | |
| 161 | @staticmethod |
| 162 | def collect_executor( |
nothing calls this directly
no test coverage detected