MCPcopy
hub / github.com/HKUDS/OpenHarness / stream_message

Method stream_message

ohmo/gateway/runtime.py:212–313  ·  view source on GitHub ↗

Submit an inbound channel message and yield progress + final reply updates.

(self, message: InboundMessage, session_key: str)

Source from the content-addressed store, hash-verified

210 return bundle
211
212 async def stream_message(self, message: InboundMessage, session_key: str):
213 """Submit an inbound channel message and yield progress + final reply updates."""
214 user_message = _build_inbound_user_message(message)
215 user_prompt = user_message.text
216 command_prompt = (message.content or "").strip()
217 session_cwd = self._cwd_for_message(message)
218 bundle = await self.get_bundle(session_key, latest_user_prompt=user_prompt, cwd=session_cwd)
219 logger.info(
220 "ohmo runtime processing start channel=%s chat_id=%s session_key=%s session_id=%s content=%r",
221 message.channel,
222 message.chat_id,
223 session_key,
224 bundle.session_id,
225 _content_snippet(user_prompt),
226 )
227
228 command_context: CommandContext | None = None
229
230 def get_command_context() -> CommandContext:
231 nonlocal command_context
232 if command_context is None:
233 command_context = CommandContext(
234 engine=bundle.engine,
235 hooks_summary=getattr(bundle, "hook_summary", lambda: "")(),
236 mcp_summary=getattr(bundle, "mcp_summary", lambda: "")(),
237 plugin_summary=getattr(bundle, "plugin_summary", lambda: "")(),
238 cwd=getattr(bundle, "cwd", str(self._cwd)),
239 tool_registry=getattr(bundle, "tool_registry", None),
240 app_state=getattr(bundle, "app_state", None),
241 session_backend=getattr(bundle, "session_backend", self._session_backend),
242 session_id=getattr(bundle, "session_id", None),
243 extra_skill_dirs=getattr(bundle, "extra_skill_dirs", ()),
244 extra_plugin_roots=getattr(bundle, "extra_plugin_roots", ()),
245 memory_backend=create_memory_command_backend(self._workspace),
246 include_project_memory=False,
247 )
248 return command_context
249
250 parsed = bundle.commands.lookup(command_prompt)
251 if parsed is None and not message.media:
252 parsed = lookup_skill_slash_command(command_prompt, get_command_context())
253 if parsed is not None and not message.media:
254 command, args = parsed
255 command_name = str(getattr(command, "name", "") or "")
256 gateway_result = self._handle_gateway_scoped_command(command_name, args)
257 if gateway_result is not None:
258 message_text, refresh_runtime = gateway_result
259 result = CommandResult(message=message_text, refresh_runtime=refresh_runtime)
260 async for update in self._stream_command_result(
261 bundle=bundle,
262 message=message,
263 session_key=session_key,
264 user_prompt=user_prompt,
265 result=result,
266 ):
267 yield update
268 return
269 remote_allowed = getattr(command, "remote_invocable", True)

Calls 12

_cwd_for_messageMethod · 0.95
get_bundleMethod · 0.95
_remote_admin_allowedMethod · 0.95
CommandResultClass · 0.90
lookupMethod · 0.80
_content_snippetFunction · 0.70
handlerMethod · 0.45