Join point that aggregates results from ``wait_for:`` steps. Reads completed step outputs from ``context.steps`` and collects them into ``output.results``. Does not block; relies on the engine executing steps sequentially.
| 9 | |
| 10 | |
| 11 | class FanInStep(StepBase): |
| 12 | """Join point that aggregates results from ``wait_for:`` steps. |
| 13 | |
| 14 | Reads completed step outputs from ``context.steps`` and collects |
| 15 | them into ``output.results``. Does not block; relies on the |
| 16 | engine executing steps sequentially. |
| 17 | """ |
| 18 | |
| 19 | type_key = "fan-in" |
| 20 | |
| 21 | def execute(self, config: dict[str, Any], context: StepContext) -> StepResult: |
| 22 | wait_for = config.get("wait_for", []) |
| 23 | output_config = config.get("output") or {} |
| 24 | if not isinstance(output_config, dict): |
| 25 | output_config = {} |
| 26 | |
| 27 | # Collect results from referenced steps |
| 28 | results = [] |
| 29 | for step_id in wait_for: |
| 30 | step_data = context.steps.get(step_id, {}) |
| 31 | results.append(step_data.get("output", {})) |
| 32 | |
| 33 | # Resolve output expressions with fan_in in context |
| 34 | prev_fan_in = getattr(context, "fan_in", None) |
| 35 | context.fan_in = {"results": results} |
| 36 | resolved_output: dict[str, Any] = {"results": results} |
| 37 | |
| 38 | try: |
| 39 | for key, expr in output_config.items(): |
| 40 | if isinstance(expr, str) and "{{" in expr: |
| 41 | resolved_output[key] = evaluate_expression(expr, context) |
| 42 | else: |
| 43 | resolved_output[key] = expr |
| 44 | finally: |
| 45 | # Restore previous fan_in state even if evaluation fails |
| 46 | context.fan_in = prev_fan_in |
| 47 | |
| 48 | return StepResult( |
| 49 | status=StepStatus.COMPLETED, |
| 50 | output=resolved_output, |
| 51 | ) |
| 52 | |
| 53 | def validate(self, config: dict[str, Any]) -> list[str]: |
| 54 | errors = super().validate(config) |
| 55 | wait_for = config.get("wait_for", []) |
| 56 | if not isinstance(wait_for, list) or not wait_for: |
| 57 | errors.append( |
| 58 | f"Fan-in step {config.get('id', '?')!r}: " |
| 59 | f"'wait_for' must be a non-empty list of step IDs." |
| 60 | ) |
| 61 | return errors |
no outgoing calls