(
self,
*,
bundle: RuntimeBundle,
message: InboundMessage,
session_key: str,
user_prompt: str,
user_message: ConversationMessage | str,
)
| 388 | await self._save_snapshot(bundle, session_key, user_prompt) |
| 389 | |
| 390 | async def _stream_engine_message( |
| 391 | self, |
| 392 | *, |
| 393 | bundle: RuntimeBundle, |
| 394 | message: InboundMessage, |
| 395 | session_key: str, |
| 396 | user_prompt: str, |
| 397 | user_message: ConversationMessage | str, |
| 398 | ): |
| 399 | bundle.engine.set_system_prompt(self._runtime_system_prompt(bundle, user_prompt)) |
| 400 | reply_parts: list[str] = [] |
| 401 | yield GatewayStreamUpdate( |
| 402 | kind="progress", |
| 403 | text=_format_channel_progress( |
| 404 | channel=message.channel, |
| 405 | kind="thinking", |
| 406 | text="Thinking...", |
| 407 | session_key=session_key, |
| 408 | content=user_prompt, |
| 409 | ), |
| 410 | metadata={"_progress": True, "_session_key": session_key}, |
| 411 | ) |
| 412 | previous_group_request = self._set_group_request_context(bundle, message, session_key) |
| 413 | try: |
| 414 | async for event in bundle.engine.submit_message(user_message): |
| 415 | if isinstance(event, ErrorEvent) and _should_retry_without_image_input( |
| 416 | event.message, |
| 417 | bundle.engine.messages, |
| 418 | ): |
| 419 | logger.warning( |
| 420 | "ohmo runtime image input rejected; retrying without image blocks session_key=%s session_id=%s message=%r", |
| 421 | session_key, |
| 422 | bundle.session_id, |
| 423 | _content_snippet(event.message), |
| 424 | ) |
| 425 | _strip_image_blocks_from_engine_history(bundle.engine) |
| 426 | yield GatewayStreamUpdate( |
| 427 | kind="progress", |
| 428 | text=_format_channel_progress( |
| 429 | channel=message.channel, |
| 430 | kind="image_fallback", |
| 431 | text=event.message, |
| 432 | session_key=session_key, |
| 433 | content=user_prompt, |
| 434 | ), |
| 435 | metadata={"_progress": True, "_session_key": session_key, "_image_fallback": True}, |
| 436 | ) |
| 437 | async for retry_event in bundle.engine.continue_pending(max_turns=bundle.engine.max_turns): |
| 438 | async for update in self._convert_stream_event( |
| 439 | event=retry_event, |
| 440 | bundle=bundle, |
| 441 | message=message, |
| 442 | session_key=session_key, |
| 443 | content=user_prompt, |
| 444 | reply_parts=reply_parts, |
| 445 | ): |
| 446 | yield update |
| 447 | break |
no test coverage detected