(self, project_dir)
| 2843 | assert "workflow-output" in state.step_results["echo"]["output"]["stdout"] |
| 2844 | |
| 2845 | def test_execute_with_if_then(self, project_dir): |
| 2846 | from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition |
| 2847 | from specify_cli.workflows.base import RunStatus |
| 2848 | |
| 2849 | yaml_str = """ |
| 2850 | schema_version: "1.0" |
| 2851 | workflow: |
| 2852 | id: "branching" |
| 2853 | name: "Branching" |
| 2854 | version: "1.0.0" |
| 2855 | inputs: |
| 2856 | scope: |
| 2857 | type: string |
| 2858 | default: "full" |
| 2859 | steps: |
| 2860 | - id: check |
| 2861 | type: if |
| 2862 | condition: "{{ inputs.scope == 'full' }}" |
| 2863 | then: |
| 2864 | - id: full-tasks |
| 2865 | type: shell |
| 2866 | run: "echo full" |
| 2867 | else: |
| 2868 | - id: partial-tasks |
| 2869 | type: shell |
| 2870 | run: "echo partial" |
| 2871 | """ |
| 2872 | definition = WorkflowDefinition.from_string(yaml_str) |
| 2873 | engine = WorkflowEngine(project_dir) |
| 2874 | state = engine.execute(definition, {"scope": "full"}) |
| 2875 | |
| 2876 | assert state.status == RunStatus.COMPLETED |
| 2877 | assert "full-tasks" in state.step_results |
| 2878 | assert "partial-tasks" not in state.step_results |
| 2879 | |
| 2880 | def test_execute_missing_required_input(self, project_dir): |
| 2881 | from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition |
nothing calls this directly
no test coverage detected