(self)
| 1957 | """Test the fan-out step type.""" |
| 1958 | |
| 1959 | def test_execute_with_items(self): |
| 1960 | from specify_cli.workflows.steps.fan_out import FanOutStep |
| 1961 | from specify_cli.workflows.base import StepContext |
| 1962 | |
| 1963 | step = FanOutStep() |
| 1964 | ctx = StepContext( |
| 1965 | steps={"tasks": {"output": {"task_list": [ |
| 1966 | {"file": "a.md"}, |
| 1967 | {"file": "b.md"}, |
| 1968 | ]}}} |
| 1969 | ) |
| 1970 | config = { |
| 1971 | "id": "parallel", |
| 1972 | "items": "{{ steps.tasks.output.task_list }}", |
| 1973 | "max_concurrency": 3, |
| 1974 | "step": {"id": "impl", "command": "speckit.implement"}, |
| 1975 | } |
| 1976 | result = step.execute(config, ctx) |
| 1977 | assert result.output["item_count"] == 2 |
| 1978 | assert result.output["max_concurrency"] == 3 |
| 1979 | |
| 1980 | def test_execute_non_list_items_fails_loudly(self): |
| 1981 | from specify_cli.workflows.steps.fan_out import FanOutStep |
nothing calls this directly
no test coverage detected