MCPcopy
hub / github.com/github/spec-kit / execute

Method execute

src/specify_cli/workflows/steps/fan_out/__init__.py:22–55  ·  view source on GitHub ↗
(self, config: dict[str, Any], context: StepContext)

Source from the content-addressed store, hash-verified

20 type_key = "fan-out"
21
22 def execute(self, config: dict[str, Any], context: StepContext) -> StepResult:
23 items_expr = config.get("items", "[]")
24 items = evaluate_expression(items_expr, context)
25 max_concurrency = config.get("max_concurrency", 1)
26 step_template = config.get("step", {})
27
28 if not isinstance(items, list):
29 # A non-list here is a wiring error (the expression did not
30 # resolve to a collection); silently fanning out over zero
31 # items hides it. An explicit empty list remains valid input.
32 return StepResult(
33 status=StepStatus.FAILED,
34 error=(
35 f"Fan-out step {config.get('id', '?')!r}: 'items' must "
36 f"resolve to a list, got {type(items).__name__} from "
37 f"{items_expr!r}."
38 ),
39 output={
40 "items": [],
41 "max_concurrency": max_concurrency,
42 "step_template": step_template,
43 "item_count": 0,
44 },
45 )
46
47 return StepResult(
48 status=StepStatus.COMPLETED,
49 output={
50 "items": items,
51 "max_concurrency": max_concurrency,
52 "step_template": step_template,
53 "item_count": len(items),
54 },
55 )
56
57 def validate(self, config: dict[str, Any]) -> list[str]:
58 errors = super().validate(config)

Calls 3

evaluate_expressionFunction · 0.90
StepResultClass · 0.90
getMethod · 0.45