Executes atoms in the workflow, with selective recomputation. Args: recompute_atoms: Optional set of atom names to force recomputation, regardless of cache status
(
self, recompute_atoms: set[str] | None = None
)
| 279 | return decorator |
| 280 | |
| 281 | def execute( |
| 282 | self, recompute_atoms: set[str] | None = None |
| 283 | ) -> dict[str, AtomResult]: |
| 284 | """ |
| 285 | Executes atoms in the workflow, with selective recomputation. |
| 286 | |
| 287 | Args: |
| 288 | recompute_atoms: Optional set of atom names to force recomputation, |
| 289 | regardless of cache status |
| 290 | """ |
| 291 | self._is_rerun = True # prevent duplicate re-registration |
| 292 | try: |
| 293 | # Clear caches and component producers, but not atoms |
| 294 | self.cache.cache.clear() |
| 295 | self._component_producers.clear() |
| 296 | |
| 297 | execution_order = self._get_execution_order() |
| 298 | atoms_to_recompute = self._get_affected_atoms(recompute_atoms or set()) |
| 299 | |
| 300 | logger.info(f"[DAG] Atoms to recompute {atoms_to_recompute=}") |
| 301 | |
| 302 | for atom_name in execution_order: |
| 303 | if self._is_rerun and recompute_atoms and atom_name not in atoms_to_recompute: |
| 304 | logger.info(f"[DAG] Skipping atom (not affected) {atom_name=}") |
| 305 | continue |
| 306 | |
| 307 | atom = self.atoms[atom_name] |
| 308 | if atom_name in atoms_to_recompute: |
| 309 | atom.force_recompute = True |
| 310 | |
| 311 | result = self._execute_atom(atom) |
| 312 | self.context.set_result(atom_name, result) |
| 313 | atom.force_recompute = False |
| 314 | |
| 315 | if result.status == AtomStatus.FAILED: |
| 316 | logger.error(f"[DAG] Execution halted due to failure {atom_name=}") |
| 317 | break |
| 318 | |
| 319 | return self.context.results |
| 320 | finally: |
| 321 | self._is_rerun = False |
| 322 | |
| 323 | def execute_relevant_atoms(self): |
| 324 | """ |
no test coverage detected