MCPcopy
hub / github.com/HKUDS/DeepCode / MessageBus

Class MessageBus

nanobot/nanobot/bus/queue.py:11–81  ·  view source on GitHub ↗

Async message bus that decouples chat channels from the agent core. Channels push messages to the inbound queue, and the agent processes them and pushes responses to the outbound queue.

Source from the content-addressed store, hash-verified

9
10
11class MessageBus:
12 """
13 Async message bus that decouples chat channels from the agent core.
14
15 Channels push messages to the inbound queue, and the agent processes
16 them and pushes responses to the outbound queue.
17 """
18
19 def __init__(self):
20 self.inbound: asyncio.Queue[InboundMessage] = asyncio.Queue()
21 self.outbound: asyncio.Queue[OutboundMessage] = asyncio.Queue()
22 self._outbound_subscribers: dict[
23 str, list[Callable[[OutboundMessage], Awaitable[None]]]
24 ] = {}
25 self._running = False
26
27 async def publish_inbound(self, msg: InboundMessage) -> None:
28 """Publish a message from a channel to the agent."""
29 await self.inbound.put(msg)
30
31 async def consume_inbound(self) -> InboundMessage:
32 """Consume the next inbound message (blocks until available)."""
33 return await self.inbound.get()
34
35 async def publish_outbound(self, msg: OutboundMessage) -> None:
36 """Publish a response from the agent to channels."""
37 await self.outbound.put(msg)
38
39 async def consume_outbound(self) -> OutboundMessage:
40 """Consume the next outbound message (blocks until available)."""
41 return await self.outbound.get()
42
43 def subscribe_outbound(
44 self, channel: str, callback: Callable[[OutboundMessage], Awaitable[None]]
45 ) -> None:
46 """Subscribe to outbound messages for a specific channel."""
47 if channel not in self._outbound_subscribers:
48 self._outbound_subscribers[channel] = []
49 self._outbound_subscribers[channel].append(callback)
50
51 async def dispatch_outbound(self) -> None:
52 """
53 Dispatch outbound messages to subscribed channels.
54 Run this as a background task.
55 """
56 self._running = True
57 while self._running:
58 try:
59 msg = await asyncio.wait_for(self.outbound.get(), timeout=1.0)
60 subscribers = self._outbound_subscribers.get(msg.channel, [])
61 for callback in subscribers:
62 try:
63 await callback(msg)
64 except Exception as e:
65 logger.error(f"Error dispatching to {msg.channel}: {e}")
66 except asyncio.TimeoutError:
67 continue
68

Callers 2

gatewayFunction · 0.90
agentFunction · 0.90

Calls

no outgoing calls

Tested by

no test coverage detected