Create a WebChat side thread.
(
self,
creator: str,
parent_session_id: str,
parent_message_id: int,
base_checkpoint_id: str,
selected_text: str,
)
| 630 | return result.scalar_one_or_none() |
| 631 | |
| 632 | async def create_webchat_thread( |
| 633 | self, |
| 634 | creator: str, |
| 635 | parent_session_id: str, |
| 636 | parent_message_id: int, |
| 637 | base_checkpoint_id: str, |
| 638 | selected_text: str, |
| 639 | ) -> WebChatThread: |
| 640 | """Create a WebChat side thread.""" |
| 641 | async with self.get_db() as session: |
| 642 | session: AsyncSession |
| 643 | async with session.begin(): |
| 644 | thread = WebChatThread( |
| 645 | creator=creator, |
| 646 | parent_session_id=parent_session_id, |
| 647 | parent_message_id=parent_message_id, |
| 648 | base_checkpoint_id=base_checkpoint_id, |
| 649 | selected_text=selected_text, |
| 650 | ) |
| 651 | session.add(thread) |
| 652 | await session.flush() |
| 653 | await session.refresh(thread) |
| 654 | return thread |
| 655 | |
| 656 | async def get_webchat_thread_by_id( |
| 657 | self, |
nothing calls this directly
no test coverage detected