Core workflow engine that manages registration and execution of reactive atoms.
| 193 | logger.debug(f"[CONTEXT] Set result for {atom_name} = {result.value}") |
| 194 | |
| 195 | class Workflow: |
| 196 | """ |
| 197 | Core workflow engine that manages registration and execution of reactive atoms. |
| 198 | """ |
| 199 | |
| 200 | def __init__(self, service: Optional["BasePreswaldService"] = None, default_retry_policy: Optional[RetryPolicy] = None): |
| 201 | self.atoms: dict[str, Atom] = {} |
| 202 | self.context = WorkflowContext() |
| 203 | self.default_retry_policy = default_retry_policy or RetryPolicy() |
| 204 | self.cache = AtomCache() |
| 205 | self._component_producers: dict[str, str] = {} # component_id -> atom_name |
| 206 | self._current_atom: str | None = None # currently executing atom |
| 207 | self._service = service |
| 208 | self._is_rerun = False |
| 209 | self._auto_atom_registry: dict[str, Callable] = {} |
| 210 | self._registered_reactive_atoms: list[Callable] = [] |
| 211 | |
| 212 | def atom( |
| 213 | self, |
| 214 | dependencies: list[str] | None = None, |
| 215 | retry_policy: RetryPolicy | None = None, |
| 216 | force_recompute: bool = False, |
| 217 | name: str | None = None, |
| 218 | ): |
| 219 | """ |
| 220 | Decorator to manually register a function as a reactive atom in the workflow. |
| 221 | |
| 222 | Atoms can be registered either explicitly using this decorator |
| 223 | or automatically through code transformation at runtime. |
| 224 | |
| 225 | If dependencies are not explicitly provided, they will be inferred |
| 226 | from the function's parameter names. |
| 227 | |
| 228 | TODO: provide example usage before PR comes out of draft |
| 229 | |
| 230 | Args: |
| 231 | dependencies (list[str], optional): |
| 232 | Explicit list of atom names this atom depends on. If omitted, inferred from function arguments. |
| 233 | retry_policy (RetryPolicy, optional): |
| 234 | Custom retry policy to apply when this atom fails. |
| 235 | force_recompute (bool, optional): |
| 236 | If True, forces this atom to recompute even if inputs have not changed. |
| 237 | name (str, optional): |
| 238 | Custom name for the atom. Defaults to the function's name. |
| 239 | """ |
| 240 | def decorator(func): |
| 241 | atom_name = name or func.__name__ |
| 242 | |
| 243 | if self._is_rerun and atom_name in self.atoms: |
| 244 | logger.debug(f"[workflow.atom] Skipping re-registration during rerun {atom_name=}") |
| 245 | return func |
| 246 | |
| 247 | logger.info(f"[workflow.atom] Registered atom {atom_name=}") |
| 248 | |
| 249 | # Use the unwrapped function to infer dependencies |
| 250 | raw_func = getattr(func, 'original_func', func) |
| 251 | inferred_deps = [ |
| 252 | k for k, v in inspect.signature(raw_func).parameters.items() |
no outgoing calls
no test coverage detected