(self, project_dir)
| 2790 | assert state.step_results["step-one"]["output"]["input"]["args"] == "login" |
| 2791 | |
| 2792 | def test_execute_with_gate_pauses(self, project_dir): |
| 2793 | from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition |
| 2794 | from specify_cli.workflows.base import RunStatus |
| 2795 | |
| 2796 | yaml_str = """ |
| 2797 | schema_version: "1.0" |
| 2798 | workflow: |
| 2799 | id: "gated" |
| 2800 | name: "Gated" |
| 2801 | version: "1.0.0" |
| 2802 | steps: |
| 2803 | - id: step-one |
| 2804 | type: shell |
| 2805 | run: "echo test" |
| 2806 | - id: gate |
| 2807 | type: gate |
| 2808 | message: "Review?" |
| 2809 | options: [approve, reject] |
| 2810 | on_reject: abort |
| 2811 | - id: step-two |
| 2812 | type: shell |
| 2813 | run: "echo done" |
| 2814 | """ |
| 2815 | definition = WorkflowDefinition.from_string(yaml_str) |
| 2816 | engine = WorkflowEngine(project_dir) |
| 2817 | state = engine.execute(definition) |
| 2818 | |
| 2819 | assert state.status == RunStatus.PAUSED |
| 2820 | assert "gate" in state.step_results |
| 2821 | assert state.step_results["gate"]["status"] == "paused" |
| 2822 | |
| 2823 | def test_execute_with_shell_step(self, project_dir): |
| 2824 | from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition |
nothing calls this directly
no test coverage detected