| 424 | |
| 425 | |
| 426 | class APIKeyService: |
| 427 | def __init__(self, session: AsyncSession): |
| 428 | self.session = session |
| 429 | |
| 430 | @locked_cached() |
| 431 | async def get_by_access_key(self, access_key: str) -> Optional[ApiKey]: |
| 432 | result = await ApiKey.one_by_field(self.session, "access_key", access_key) |
| 433 | if result is None: |
| 434 | return None |
| 435 | self.session.expunge(result) |
| 436 | return result |
| 437 | |
| 438 | async def get_by_user_id(self, user_id: int) -> List[ApiKey]: |
| 439 | results = await ApiKey.all_by_field(self.session, "user_id", user_id) |
| 440 | if results is None: |
| 441 | return [] |
| 442 | for result in results: |
| 443 | self.session.expunge(result) |
| 444 | return results |
| 445 | |
| 446 | async def update(self, api_key: ApiKey, source: Union[dict, SQLModel, None] = None): |
| 447 | result = await api_key.update(self.session, source) |
| 448 | await delete_cache_by_key(self.get_by_access_key, api_key.access_key) |
| 449 | return result |
| 450 | |
| 451 | async def delete(self, api_key: ApiKey): |
| 452 | result = await api_key.delete(self.session) |
| 453 | await delete_cache_by_key(self.get_by_access_key, api_key.access_key) |
| 454 | return result |
| 455 | |
| 456 | |
| 457 | class ClusterService: |
no outgoing calls
no test coverage detected