MCPcopy
hub / github.com/IBM/AssetOpsBench / _json_events

Function _json_events

src/agent/opencode_agent/runner.py:219–249  ·  view source on GitHub ↗

Parse OpenCode's JSON-lines event stream, preserving non-JSON lines.

(stdout: str)

Source from the content-addressed store, hash-verified

217
218
219def _json_events(stdout: str) -> tuple[list[dict[str, Any]], list[str]]:
220 """Parse OpenCode's JSON-lines event stream, preserving non-JSON lines."""
221 stripped = stdout.strip()
222 if not stripped:
223 return [], []
224
225 try:
226 parsed = json.loads(stripped)
227 except json.JSONDecodeError:
228 parsed = None
229 if isinstance(parsed, list):
230 return [item for item in parsed if isinstance(item, dict)], []
231 if isinstance(parsed, dict):
232 return [parsed], []
233
234 events: list[dict[str, Any]] = []
235 plain_lines: list[str] = []
236 for line in stdout.splitlines():
237 line = line.strip()
238 if not line:
239 continue
240 try:
241 item = json.loads(line)
242 except json.JSONDecodeError:
243 plain_lines.append(line)
244 continue
245 if isinstance(item, dict):
246 events.append(item)
247 else:
248 plain_lines.append(line)
249 return events, plain_lines
250
251
252def _candidate_part(event: dict[str, Any]) -> dict[str, Any] | None:

Callers 2

runMethod · 0.85

Calls

no outgoing calls