Execute top-level atoms (atoms with no dependencies). This mimics natural script execution by triggering leaf atoms, allowing dependencies to propagate automatically.
(self)
| 321 | self._is_rerun = False |
| 322 | |
| 323 | def execute_relevant_atoms(self): |
| 324 | """ |
| 325 | Execute top-level atoms (atoms with no dependencies). |
| 326 | This mimics natural script execution by triggering leaf atoms, |
| 327 | allowing dependencies to propagate automatically. |
| 328 | """ |
| 329 | top_level_atoms = [name for name, atom in self.atoms.items() if not atom.dependencies] |
| 330 | logger.debug(f"[workflow] Executing top-level atoms {top_level_atoms=}") |
| 331 | for atom_name in top_level_atoms: |
| 332 | try: |
| 333 | logger.debug(f"[workflow] Triggering top-level atom {atom_name=}") |
| 334 | atom = self.atoms[atom_name] |
| 335 | result = self._execute_atom(atom) |
| 336 | self.context.set_result(atom_name, result) |
| 337 | except Exception as e: |
| 338 | logger.warning(f"[workflow] Failed to execute top-level atom {atom_name=} {e=}", exc_info=True) |
| 339 | |
| 340 | def get_component_producer(self, component_id: str) -> str | None: |
| 341 | """Return atom that produced a given component ID.""" |
no test coverage detected