(self)
| 2032 | """Test the fan-in step type.""" |
| 2033 | |
| 2034 | def test_execute_collects_results(self): |
| 2035 | from specify_cli.workflows.steps.fan_in import FanInStep |
| 2036 | from specify_cli.workflows.base import StepContext |
| 2037 | |
| 2038 | step = FanInStep() |
| 2039 | ctx = StepContext( |
| 2040 | steps={ |
| 2041 | "parallel": {"output": {"item_count": 2, "status": "done"}} |
| 2042 | } |
| 2043 | ) |
| 2044 | config = { |
| 2045 | "id": "collect", |
| 2046 | "wait_for": ["parallel"], |
| 2047 | "output": {}, |
| 2048 | } |
| 2049 | result = step.execute(config, ctx) |
| 2050 | assert len(result.output["results"]) == 1 |
| 2051 | assert result.output["results"][0]["item_count"] == 2 |
| 2052 | |
| 2053 | def test_execute_multiple_wait_for(self): |
| 2054 | from specify_cli.workflows.steps.fan_in import FanInStep |
nothing calls this directly
no test coverage detected