MCPcopy Index your code
hub / github.com/shareAI-lab/learn-claude-code / MessageBus

Class MessageBus

s19_mcp_plugin/code.py:319–338  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

317
318
319class MessageBus:
320 def send(self, from_agent: str, to_agent: str, content: str,
321 msg_type: str = "message", metadata: dict = None):
322 msg = {"from": from_agent, "to": to_agent,
323 "content": content, "type": msg_type,
324 "ts": time.time(), "metadata": metadata or {}}
325 inbox = MAILBOX_DIR / f"{to_agent}.jsonl"
326 with open(inbox, "a") as f:
327 f.write(json.dumps(msg) + "\n")
328 print(f" \033[33m[bus] {from_agent} → {to_agent}: "
329 f"({msg_type}) {content[:50]}\033[0m")
330
331 def read_inbox(self, agent: str) -> list[dict]:
332 inbox = MAILBOX_DIR / f"{agent}.jsonl"
333 if not inbox.exists():
334 return []
335 msgs = [json.loads(line) for line in inbox.read_text().splitlines()
336 if line.strip()]
337 inbox.unlink()
338 return msgs
339
340
341BUS = MessageBus()

Callers 1

code.pyFile · 0.70

Calls

no outgoing calls

Tested by

no test coverage detected