Resume a paused or failed workflow run. When ``inputs`` is provided, the values are merged over the run's persisted inputs and re-resolved through the same typed validation path used by :meth:`execute`, so the resumed step sees updated workflow inputs. Keys not suppl
(
self,
run_id: str,
inputs: dict[str, Any] | None = None,
)
| 701 | return state |
| 702 | |
| 703 | def resume( |
| 704 | self, |
| 705 | run_id: str, |
| 706 | inputs: dict[str, Any] | None = None, |
| 707 | ) -> RunState: |
| 708 | """Resume a paused or failed workflow run. |
| 709 | |
| 710 | When ``inputs`` is provided, the values are merged over the run's |
| 711 | persisted inputs and re-resolved through the same typed validation |
| 712 | path used by :meth:`execute`, so the resumed step sees updated |
| 713 | workflow inputs. Keys not supplied keep their persisted values; an |
| 714 | empty/``None`` ``inputs`` leaves the run's inputs unchanged. |
| 715 | """ |
| 716 | state = RunState.load(run_id, self.project_root) |
| 717 | if state.status not in (RunStatus.PAUSED, RunStatus.FAILED): |
| 718 | msg = f"Cannot resume run {run_id!r} with status {state.status.value!r}." |
| 719 | raise ValueError(msg) |
| 720 | |
| 721 | # Load the workflow definition — try the persisted copy in the |
| 722 | # run directory first so resume works even if the original |
| 723 | # source (e.g. a local YAML path) is no longer available. |
| 724 | run_dir = self.project_root / ".specify" / "workflows" / "runs" / run_id |
| 725 | run_copy = run_dir / "workflow.yml" |
| 726 | if run_copy.exists(): |
| 727 | definition = WorkflowDefinition.from_yaml(run_copy) |
| 728 | else: |
| 729 | definition = self.load_workflow(state.workflow_id) |
| 730 | |
| 731 | # Merge any newly-supplied inputs over the persisted ones and |
| 732 | # re-validate through the same typing path as the initial run. |
| 733 | if inputs: |
| 734 | merged = {**state.inputs, **inputs} |
| 735 | state.inputs = self._resolve_inputs(definition, merged) |
| 736 | |
| 737 | # Restore context |
| 738 | context = StepContext( |
| 739 | inputs=state.inputs, |
| 740 | steps=state.step_results, |
| 741 | default_integration=definition.default_integration, |
| 742 | default_model=definition.default_model, |
| 743 | default_options=definition.default_options, |
| 744 | project_root=str(self.project_root), |
| 745 | run_id=state.run_id, |
| 746 | ) |
| 747 | |
| 748 | from . import STEP_REGISTRY |
| 749 | |
| 750 | state.status = RunStatus.RUNNING |
| 751 | state.save() |
| 752 | |
| 753 | # Resume from the current step — re-execute it so gates |
| 754 | # can prompt interactively again. |
| 755 | remaining_steps = definition.steps[state.current_step_index :] |
| 756 | step_offset = state.current_step_index |
| 757 | |
| 758 | try: |
| 759 | self._execute_steps( |
| 760 | remaining_steps, context, state, STEP_REGISTRY, |