| 20 | type_key = "fan-out" |
| 21 | |
| 22 | def execute(self, config: dict[str, Any], context: StepContext) -> StepResult: |
| 23 | items_expr = config.get("items", "[]") |
| 24 | items = evaluate_expression(items_expr, context) |
| 25 | max_concurrency = config.get("max_concurrency", 1) |
| 26 | step_template = config.get("step", {}) |
| 27 | |
| 28 | if not isinstance(items, list): |
| 29 | # A non-list here is a wiring error (the expression did not |
| 30 | # resolve to a collection); silently fanning out over zero |
| 31 | # items hides it. An explicit empty list remains valid input. |
| 32 | return StepResult( |
| 33 | status=StepStatus.FAILED, |
| 34 | error=( |
| 35 | f"Fan-out step {config.get('id', '?')!r}: 'items' must " |
| 36 | f"resolve to a list, got {type(items).__name__} from " |
| 37 | f"{items_expr!r}." |
| 38 | ), |
| 39 | output={ |
| 40 | "items": [], |
| 41 | "max_concurrency": max_concurrency, |
| 42 | "step_template": step_template, |
| 43 | "item_count": 0, |
| 44 | }, |
| 45 | ) |
| 46 | |
| 47 | return StepResult( |
| 48 | status=StepStatus.COMPLETED, |
| 49 | output={ |
| 50 | "items": items, |
| 51 | "max_concurrency": max_concurrency, |
| 52 | "step_template": step_template, |
| 53 | "item_count": len(items), |
| 54 | }, |
| 55 | ) |
| 56 | |
| 57 | def validate(self, config: dict[str, Any]) -> list[str]: |
| 58 | errors = super().validate(config) |