MCPcopy Index your code
hub / github.com/github/spec-kit / _build

Method _build

tests/test_workflows.py:2107–2139  ·  view source on GitHub ↗

Wire an engine + run state to a probe step that echoes context.item. Per-item output is ``{"seen": }`` so order and per-thread item isolation are checkable. ``on_item(item)`` may run a side effect and optionally return a StepStatus to override COMPLETED (or raise).

(tmp_path, on_item=None)

Source from the content-addressed store, hash-verified

2105
2106 @staticmethod
2107 def _build(tmp_path, on_item=None):
2108 """Wire an engine + run state to a probe step that echoes context.item.
2109
2110 Per-item output is ``{"seen": <item>}`` so order and per-thread item
2111 isolation are checkable. ``on_item(item)`` may run a side effect and
2112 optionally return a StepStatus to override COMPLETED (or raise).
2113 """
2114 from specify_cli.workflows.base import (
2115 RunStatus,
2116 StepBase,
2117 StepContext,
2118 StepResult,
2119 StepStatus,
2120 )
2121 from specify_cli.workflows.engine import RunState, WorkflowEngine
2122
2123 class _ProbeStep(StepBase):
2124 type_key = "probe"
2125
2126 def execute(self, config, context):
2127 status = StepStatus.COMPLETED
2128 if on_item is not None:
2129 override = on_item(context.item)
2130 if override is not None:
2131 status = override
2132 return StepResult(status=status, output={"seen": context.item})
2133
2134 engine = WorkflowEngine(project_root=tmp_path)
2135 context = StepContext()
2136 state = RunState(run_id="r", workflow_id="w", project_root=tmp_path)
2137 state.status = RunStatus.RUNNING
2138 template = {"id": "impl", "type": "probe"}
2139 return engine, context, state, {"probe": _ProbeStep()}, template
2140
2141 def _run(self, tmp_path, items, max_concurrency, on_item=None):
2142 engine, context, state, registry, template = self._build(tmp_path, on_item)

Calls 4

WorkflowEngineClass · 0.90
StepContextClass · 0.90
RunStateClass · 0.90
_ProbeStepClass · 0.85

Tested by

no test coverage detected