Create a new Platform session.
(
self,
creator: str,
platform_id: str = "webchat",
session_id: str | None = None,
display_name: str | None = None,
is_group: int = 0,
)
| 1617 | # ==== |
| 1618 | |
| 1619 | async def create_platform_session( |
| 1620 | self, |
| 1621 | creator: str, |
| 1622 | platform_id: str = "webchat", |
| 1623 | session_id: str | None = None, |
| 1624 | display_name: str | None = None, |
| 1625 | is_group: int = 0, |
| 1626 | ) -> PlatformSession: |
| 1627 | """Create a new Platform session.""" |
| 1628 | kwargs = {} |
| 1629 | if session_id: |
| 1630 | kwargs["session_id"] = session_id |
| 1631 | |
| 1632 | async with self.get_db() as session: |
| 1633 | session: AsyncSession |
| 1634 | async with session.begin(): |
| 1635 | new_session = PlatformSession( |
| 1636 | creator=creator, |
| 1637 | platform_id=platform_id, |
| 1638 | display_name=display_name, |
| 1639 | is_group=is_group, |
| 1640 | **kwargs, |
| 1641 | ) |
| 1642 | session.add(new_session) |
| 1643 | await session.flush() |
| 1644 | await session.refresh(new_session) |
| 1645 | return new_session |
| 1646 | |
| 1647 | async def get_platform_session_by_id( |
| 1648 | self, session_id: str |
nothing calls this directly
no test coverage detected