Wire an engine + run state to a probe step that echoes context.item. Per-item output is ``{"seen": }`` so order and per-thread item isolation are checkable. ``on_item(item)`` may run a side effect and optionally return a StepStatus to override COMPLETED (or raise).
(tmp_path, on_item=None)
| 2105 | |
| 2106 | @staticmethod |
| 2107 | def _build(tmp_path, on_item=None): |
| 2108 | """Wire an engine + run state to a probe step that echoes context.item. |
| 2109 | |
| 2110 | Per-item output is ``{"seen": <item>}`` so order and per-thread item |
| 2111 | isolation are checkable. ``on_item(item)`` may run a side effect and |
| 2112 | optionally return a StepStatus to override COMPLETED (or raise). |
| 2113 | """ |
| 2114 | from specify_cli.workflows.base import ( |
| 2115 | RunStatus, |
| 2116 | StepBase, |
| 2117 | StepContext, |
| 2118 | StepResult, |
| 2119 | StepStatus, |
| 2120 | ) |
| 2121 | from specify_cli.workflows.engine import RunState, WorkflowEngine |
| 2122 | |
| 2123 | class _ProbeStep(StepBase): |
| 2124 | type_key = "probe" |
| 2125 | |
| 2126 | def execute(self, config, context): |
| 2127 | status = StepStatus.COMPLETED |
| 2128 | if on_item is not None: |
| 2129 | override = on_item(context.item) |
| 2130 | if override is not None: |
| 2131 | status = override |
| 2132 | return StepResult(status=status, output={"seen": context.item}) |
| 2133 | |
| 2134 | engine = WorkflowEngine(project_root=tmp_path) |
| 2135 | context = StepContext() |
| 2136 | state = RunState(run_id="r", workflow_id="w", project_root=tmp_path) |
| 2137 | state.status = RunStatus.RUNNING |
| 2138 | template = {"id": "impl", "type": "probe"} |
| 2139 | return engine, context, state, {"probe": _ProbeStep()}, template |
| 2140 | |
| 2141 | def _run(self, tmp_path, items, max_concurrency, on_item=None): |
| 2142 | engine, context, state, registry, template = self._build(tmp_path, on_item) |
no test coverage detected