Submit an inbound channel message and yield progress + final reply updates.
(self, message: InboundMessage, session_key: str)
| 210 | return bundle |
| 211 | |
| 212 | async def stream_message(self, message: InboundMessage, session_key: str): |
| 213 | """Submit an inbound channel message and yield progress + final reply updates.""" |
| 214 | user_message = _build_inbound_user_message(message) |
| 215 | user_prompt = user_message.text |
| 216 | command_prompt = (message.content or "").strip() |
| 217 | session_cwd = self._cwd_for_message(message) |
| 218 | bundle = await self.get_bundle(session_key, latest_user_prompt=user_prompt, cwd=session_cwd) |
| 219 | logger.info( |
| 220 | "ohmo runtime processing start channel=%s chat_id=%s session_key=%s session_id=%s content=%r", |
| 221 | message.channel, |
| 222 | message.chat_id, |
| 223 | session_key, |
| 224 | bundle.session_id, |
| 225 | _content_snippet(user_prompt), |
| 226 | ) |
| 227 | |
| 228 | command_context: CommandContext | None = None |
| 229 | |
| 230 | def get_command_context() -> CommandContext: |
| 231 | nonlocal command_context |
| 232 | if command_context is None: |
| 233 | command_context = CommandContext( |
| 234 | engine=bundle.engine, |
| 235 | hooks_summary=getattr(bundle, "hook_summary", lambda: "")(), |
| 236 | mcp_summary=getattr(bundle, "mcp_summary", lambda: "")(), |
| 237 | plugin_summary=getattr(bundle, "plugin_summary", lambda: "")(), |
| 238 | cwd=getattr(bundle, "cwd", str(self._cwd)), |
| 239 | tool_registry=getattr(bundle, "tool_registry", None), |
| 240 | app_state=getattr(bundle, "app_state", None), |
| 241 | session_backend=getattr(bundle, "session_backend", self._session_backend), |
| 242 | session_id=getattr(bundle, "session_id", None), |
| 243 | extra_skill_dirs=getattr(bundle, "extra_skill_dirs", ()), |
| 244 | extra_plugin_roots=getattr(bundle, "extra_plugin_roots", ()), |
| 245 | memory_backend=create_memory_command_backend(self._workspace), |
| 246 | include_project_memory=False, |
| 247 | ) |
| 248 | return command_context |
| 249 | |
| 250 | parsed = bundle.commands.lookup(command_prompt) |
| 251 | if parsed is None and not message.media: |
| 252 | parsed = lookup_skill_slash_command(command_prompt, get_command_context()) |
| 253 | if parsed is not None and not message.media: |
| 254 | command, args = parsed |
| 255 | command_name = str(getattr(command, "name", "") or "") |
| 256 | gateway_result = self._handle_gateway_scoped_command(command_name, args) |
| 257 | if gateway_result is not None: |
| 258 | message_text, refresh_runtime = gateway_result |
| 259 | result = CommandResult(message=message_text, refresh_runtime=refresh_runtime) |
| 260 | async for update in self._stream_command_result( |
| 261 | bundle=bundle, |
| 262 | message=message, |
| 263 | session_key=session_key, |
| 264 | user_prompt=user_prompt, |
| 265 | result=result, |
| 266 | ): |
| 267 | yield update |
| 268 | return |
| 269 | remote_allowed = getattr(command, "remote_invocable", True) |