(self, sender: str, to: str, content: str,
msg_type: str = "message", extra: dict = None)
| 366 | INBOX_DIR.mkdir(parents=True, exist_ok=True) |
| 367 | |
| 368 | def send(self, sender: str, to: str, content: str, |
| 369 | msg_type: str = "message", extra: dict = None) -> str: |
| 370 | msg = {"type": msg_type, "from": sender, "content": content, |
| 371 | "timestamp": time.time()} |
| 372 | if extra: msg.update(extra) |
| 373 | with open(INBOX_DIR / f"{to}.jsonl", "a") as f: |
| 374 | f.write(json.dumps(msg) + "\n") |
| 375 | return f"Sent {msg_type} to {to}" |
| 376 | |
| 377 | def read_inbox(self, name: str) -> list: |
| 378 | path = INBOX_DIR / f"{name}.jsonl" |
no test coverage detected