MCPcopy
hub / github.com/StructuredLabs/preswald / Workflow

Class Workflow

preswald/interfaces/workflow.py:195–601  ·  view source on GitHub ↗

Core workflow engine that manages registration and execution of reactive atoms.

Source from the content-addressed store, hash-verified

193 logger.debug(f"[CONTEXT] Set result for {atom_name} = {result.value}")
194
195class Workflow:
196 """
197 Core workflow engine that manages registration and execution of reactive atoms.
198 """
199
200 def __init__(self, service: Optional["BasePreswaldService"] = None, default_retry_policy: Optional[RetryPolicy] = None):
201 self.atoms: dict[str, Atom] = {}
202 self.context = WorkflowContext()
203 self.default_retry_policy = default_retry_policy or RetryPolicy()
204 self.cache = AtomCache()
205 self._component_producers: dict[str, str] = {} # component_id -> atom_name
206 self._current_atom: str | None = None # currently executing atom
207 self._service = service
208 self._is_rerun = False
209 self._auto_atom_registry: dict[str, Callable] = {}
210 self._registered_reactive_atoms: list[Callable] = []
211
212 def atom(
213 self,
214 dependencies: list[str] | None = None,
215 retry_policy: RetryPolicy | None = None,
216 force_recompute: bool = False,
217 name: str | None = None,
218 ):
219 """
220 Decorator to manually register a function as a reactive atom in the workflow.
221
222 Atoms can be registered either explicitly using this decorator
223 or automatically through code transformation at runtime.
224
225 If dependencies are not explicitly provided, they will be inferred
226 from the function's parameter names.
227
228 TODO: provide example usage before PR comes out of draft
229
230 Args:
231 dependencies (list[str], optional):
232 Explicit list of atom names this atom depends on. If omitted, inferred from function arguments.
233 retry_policy (RetryPolicy, optional):
234 Custom retry policy to apply when this atom fails.
235 force_recompute (bool, optional):
236 If True, forces this atom to recompute even if inputs have not changed.
237 name (str, optional):
238 Custom name for the atom. Defaults to the function's name.
239 """
240 def decorator(func):
241 atom_name = name or func.__name__
242
243 if self._is_rerun and atom_name in self.atoms:
244 logger.debug(f"[workflow.atom] Skipping re-registration during rerun {atom_name=}")
245 return func
246
247 logger.info(f"[workflow.atom] Registered atom {atom_name=}")
248
249 # Use the unwrapped function to infer dependencies
250 raw_func = getattr(func, 'original_func', func)
251 inferred_deps = [
252 k for k, v in inspect.signature(raw_func).parameters.items()

Callers 3

hello.pyFile · 0.90
workflow_dag_demoFunction · 0.90
__init__Method · 0.90

Calls

no outgoing calls

Tested by

no test coverage detected