MCPcopy
hub / github.com/HKUDS/OpenHarness / _run_command_flow

Function _run_command_flow

scripts/local_system_scenarios.py:181–219  ·  view source on GitHub ↗
(temp_root: Path)

Source from the content-addressed store, hash-verified

179
180
181async def _run_command_flow(temp_root: Path) -> None:
182 registry = create_default_command_registry()
183 context = _make_command_context(temp_root)
184 (temp_root / "src").mkdir()
185 (temp_root / "src" / "demo.py").write_text("print('demo')\n", encoding="utf-8")
186 for raw in [
187 "/memory add Notes :: local command note",
188 "/output-style set minimal",
189 "/vim on",
190 "/voice on",
191 "/plan on",
192 "/effort high",
193 "/passes 2",
194 "/tasks run printf 'local-command-task'",
195 "/init",
196 ]:
197 command, args = registry.lookup(raw)
198 await command.handler(args, context)
199 doctor_command, doctor_args = registry.lookup("/doctor")
200 doctor_result = await doctor_command.handler(doctor_args, context)
201 if "- output_style: minimal" not in doctor_result.message:
202 raise AssertionError(doctor_result.message)
203
204 for raw, expected in [
205 ("/files demo.py", "src/demo.py"),
206 ("/session", "Session directory:"),
207 ("/session tag local-smoke", "local-smoke.json"),
208 ("/bridge show", "Bridge summary:"),
209 ("/privacy-settings", "Privacy settings:"),
210 ("/rate-limit-options", "Rate limit options:"),
211 ("/release-notes", "Release Notes"),
212 ("/upgrade", "Upgrade instructions:"),
213 ]:
214 command, args = registry.lookup(raw)
215 result = await command.handler(args, context)
216 if expected not in (result.message or ""):
217 raise AssertionError(f"{raw} failed: {result.message}")
218
219 print("[commands] PASS")
220
221
222async def main() -> int:

Callers 1

mainFunction · 0.70

Calls 4

_make_command_contextFunction · 0.85
lookupMethod · 0.80
handlerMethod · 0.45

Tested by

no test coverage detected