Create a new API key record.
(
self,
name: str,
key_hash: str,
key_prefix: str,
scopes: list[str] | None,
created_by: str,
expires_at: datetime | None = None,
)
| 825 | return result.rowcount |
| 826 | |
| 827 | async def create_api_key( |
| 828 | self, |
| 829 | name: str, |
| 830 | key_hash: str, |
| 831 | key_prefix: str, |
| 832 | scopes: list[str] | None, |
| 833 | created_by: str, |
| 834 | expires_at: datetime | None = None, |
| 835 | ) -> ApiKey: |
| 836 | """Create a new API key record.""" |
| 837 | async with self.get_db() as session: |
| 838 | session: AsyncSession |
| 839 | async with session.begin(): |
| 840 | api_key = ApiKey( |
| 841 | name=name, |
| 842 | key_hash=key_hash, |
| 843 | key_prefix=key_prefix, |
| 844 | scopes=scopes, |
| 845 | created_by=created_by, |
| 846 | expires_at=expires_at, |
| 847 | ) |
| 848 | session.add(api_key) |
| 849 | await session.flush() |
| 850 | await session.refresh(api_key) |
| 851 | return api_key |
| 852 | |
| 853 | async def list_api_keys(self) -> list[ApiKey]: |
| 854 | """List all API keys.""" |