Async message bus that decouples chat channels from the agent core. Channels push messages to the inbound queue, and the agent processes them and pushes responses to the outbound queue.
| 9 | |
| 10 | |
| 11 | class MessageBus: |
| 12 | """ |
| 13 | Async message bus that decouples chat channels from the agent core. |
| 14 | |
| 15 | Channels push messages to the inbound queue, and the agent processes |
| 16 | them and pushes responses to the outbound queue. |
| 17 | """ |
| 18 | |
| 19 | def __init__(self): |
| 20 | self.inbound: asyncio.Queue[InboundMessage] = asyncio.Queue() |
| 21 | self.outbound: asyncio.Queue[OutboundMessage] = asyncio.Queue() |
| 22 | self._outbound_subscribers: dict[ |
| 23 | str, list[Callable[[OutboundMessage], Awaitable[None]]] |
| 24 | ] = {} |
| 25 | self._running = False |
| 26 | |
| 27 | async def publish_inbound(self, msg: InboundMessage) -> None: |
| 28 | """Publish a message from a channel to the agent.""" |
| 29 | await self.inbound.put(msg) |
| 30 | |
| 31 | async def consume_inbound(self) -> InboundMessage: |
| 32 | """Consume the next inbound message (blocks until available).""" |
| 33 | return await self.inbound.get() |
| 34 | |
| 35 | async def publish_outbound(self, msg: OutboundMessage) -> None: |
| 36 | """Publish a response from the agent to channels.""" |
| 37 | await self.outbound.put(msg) |
| 38 | |
| 39 | async def consume_outbound(self) -> OutboundMessage: |
| 40 | """Consume the next outbound message (blocks until available).""" |
| 41 | return await self.outbound.get() |
| 42 | |
| 43 | def subscribe_outbound( |
| 44 | self, channel: str, callback: Callable[[OutboundMessage], Awaitable[None]] |
| 45 | ) -> None: |
| 46 | """Subscribe to outbound messages for a specific channel.""" |
| 47 | if channel not in self._outbound_subscribers: |
| 48 | self._outbound_subscribers[channel] = [] |
| 49 | self._outbound_subscribers[channel].append(callback) |
| 50 | |
| 51 | async def dispatch_outbound(self) -> None: |
| 52 | """ |
| 53 | Dispatch outbound messages to subscribed channels. |
| 54 | Run this as a background task. |
| 55 | """ |
| 56 | self._running = True |
| 57 | while self._running: |
| 58 | try: |
| 59 | msg = await asyncio.wait_for(self.outbound.get(), timeout=1.0) |
| 60 | subscribers = self._outbound_subscribers.get(msg.channel, []) |
| 61 | for callback in subscribers: |
| 62 | try: |
| 63 | await callback(msg) |
| 64 | except Exception as e: |
| 65 | logger.error(f"Error dispatching to {msg.channel}: {e}") |
| 66 | except asyncio.TimeoutError: |
| 67 | continue |
| 68 |