(
self, entry: QueueEntry, executor: "BaseExecutor", **kwargs
)
| 99 | return results |
| 100 | |
| 101 | def _reproduce_entry( |
| 102 | self, entry: QueueEntry, executor: "BaseExecutor", **kwargs |
| 103 | ) -> dict[str, dict[str, str]]: |
| 104 | kwargs.pop("copy_paths", None) |
| 105 | from dvc_task.proc.process import ProcessInfo |
| 106 | |
| 107 | results: dict[str, dict[str, str]] = defaultdict(dict) |
| 108 | exec_name = self._EXEC_NAME or entry.stash_rev |
| 109 | proc_info = ProcessInfo(os.getpid(), None, None, None, None) |
| 110 | proc_info_path = self._proc_info_path(exec_name) |
| 111 | os.makedirs(os.path.dirname(proc_info_path), exist_ok=True) |
| 112 | proc_info.dump(proc_info_path) |
| 113 | infofile = self.get_infofile_path(exec_name) |
| 114 | try: |
| 115 | rev = entry.stash_rev |
| 116 | exec_result = executor.reproduce( |
| 117 | info=executor.info, |
| 118 | rev=rev, |
| 119 | infofile=infofile, |
| 120 | log_level=logger.getEffectiveLevel(), |
| 121 | log_errors=not isinstance(executor, WorkspaceExecutor), |
| 122 | message=kwargs.get("message"), |
| 123 | ) |
| 124 | if not exec_result.exp_hash: |
| 125 | raise DvcException( # noqa: TRY301 |
| 126 | f"Failed to reproduce experiment '{rev[:7]}'" |
| 127 | ) |
| 128 | if exec_result.ref_info: |
| 129 | results[rev].update( |
| 130 | self.collect_executor(self.repo.experiments, executor, exec_result) |
| 131 | ) |
| 132 | except DvcException: |
| 133 | raise |
| 134 | except Exception as exc: |
| 135 | raise DvcException(f"Failed to reproduce experiment '{rev[:7]}'") from exc |
| 136 | finally: |
| 137 | executor.cleanup(infofile) |
| 138 | remove(self._proc_info_path(exec_name)) |
| 139 | return results |
| 140 | |
| 141 | def _proc_info_path(self, name: str) -> str: |
| 142 | return os.path.join(self.pid_dir, name, f"{name}.json") |
no test coverage detected