MCPcopy
hub / github.com/HKUDS/OpenHarness / _stream_engine_message

Method _stream_engine_message

ohmo/gateway/runtime.py:390–483  ·  view source on GitHub ↗
(
        self,
        *,
        bundle: RuntimeBundle,
        message: InboundMessage,
        session_key: str,
        user_prompt: str,
        user_message: ConversationMessage | str,
    )

Source from the content-addressed store, hash-verified

388 await self._save_snapshot(bundle, session_key, user_prompt)
389
390 async def _stream_engine_message(
391 self,
392 *,
393 bundle: RuntimeBundle,
394 message: InboundMessage,
395 session_key: str,
396 user_prompt: str,
397 user_message: ConversationMessage | str,
398 ):
399 bundle.engine.set_system_prompt(self._runtime_system_prompt(bundle, user_prompt))
400 reply_parts: list[str] = []
401 yield GatewayStreamUpdate(
402 kind="progress",
403 text=_format_channel_progress(
404 channel=message.channel,
405 kind="thinking",
406 text="Thinking...",
407 session_key=session_key,
408 content=user_prompt,
409 ),
410 metadata={"_progress": True, "_session_key": session_key},
411 )
412 previous_group_request = self._set_group_request_context(bundle, message, session_key)
413 try:
414 async for event in bundle.engine.submit_message(user_message):
415 if isinstance(event, ErrorEvent) and _should_retry_without_image_input(
416 event.message,
417 bundle.engine.messages,
418 ):
419 logger.warning(
420 "ohmo runtime image input rejected; retrying without image blocks session_key=%s session_id=%s message=%r",
421 session_key,
422 bundle.session_id,
423 _content_snippet(event.message),
424 )
425 _strip_image_blocks_from_engine_history(bundle.engine)
426 yield GatewayStreamUpdate(
427 kind="progress",
428 text=_format_channel_progress(
429 channel=message.channel,
430 kind="image_fallback",
431 text=event.message,
432 session_key=session_key,
433 content=user_prompt,
434 ),
435 metadata={"_progress": True, "_session_key": session_key, "_image_fallback": True},
436 )
437 async for retry_event in bundle.engine.continue_pending(max_turns=bundle.engine.max_turns):
438 async for update in self._convert_stream_event(
439 event=retry_event,
440 bundle=bundle,
441 message=message,
442 session_key=session_key,
443 content=user_prompt,
444 reply_parts=reply_parts,
445 ):
446 yield update
447 break

Callers 2

stream_messageMethod · 0.95

Tested by

no test coverage detected