MCPcopy
hub / github.com/shareAI-lab/learn-claude-code / EventBus

Class EventBus

agents/s12_worktree_task_isolation.py:83–118  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

81
82# -- EventBus: append-only lifecycle events for observability --
83class EventBus:
84 def __init__(self, event_log_path: Path):
85 self.path = event_log_path
86 self.path.parent.mkdir(parents=True, exist_ok=True)
87 if not self.path.exists():
88 self.path.write_text("")
89
90 def emit(
91 self,
92 event: str,
93 task: dict | None = None,
94 worktree: dict | None = None,
95 error: str | None = None,
96 ):
97 payload = {
98 "event": event,
99 "ts": time.time(),
100 "task": task or {},
101 "worktree": worktree or {},
102 }
103 if error:
104 payload["error"] = error
105 with self.path.open("a", encoding="utf-8") as f:
106 f.write(json.dumps(payload) + "\n")
107
108 def list_recent(self, limit: int = 20) -> str:
109 n = max(1, min(int(limit or 20), 200))
110 lines = self.path.read_text(encoding="utf-8").splitlines()
111 recent = lines[-n:]
112 items = []
113 for line in recent:
114 try:
115 items.append(json.loads(line))
116 except Exception:
117 items.append({"event": "parse_error", "raw": line})
118 return json.dumps(items, indent=2)
119
120
121# -- TaskManager: persistent task board with optional worktree binding --

Callers 1

Calls

no outgoing calls

Tested by

no test coverage detected