Validate an API key and return the associated user_id and org_id if valid. Returns: ApiKeyValidationResult if the key is valid, None otherwise. The org_id may be None for legacy API keys that weren't bound to an organization.
(self, api_key: str)
| 204 | return api_key |
| 205 | |
| 206 | async def validate_api_key(self, api_key: str) -> ApiKeyValidationResult | None: |
| 207 | """Validate an API key and return the associated user_id and org_id if valid. |
| 208 | |
| 209 | Returns: |
| 210 | ApiKeyValidationResult if the key is valid, None otherwise. |
| 211 | The org_id may be None for legacy API keys that weren't bound to an organization. |
| 212 | """ |
| 213 | now = datetime.now(UTC) |
| 214 | |
| 215 | async with a_session_maker() as session: |
| 216 | result = await session.execute(select(ApiKey).filter(ApiKey.key == api_key)) |
| 217 | key_record = result.scalars().first() |
| 218 | |
| 219 | if not key_record: |
| 220 | return None |
| 221 | |
| 222 | # expires_at is stored as naive UTC; re-attach tzinfo for comparison. |
| 223 | if key_record.expires_at: |
| 224 | expires_at = key_record.expires_at |
| 225 | if expires_at.tzinfo is None: |
| 226 | expires_at = expires_at.replace(tzinfo=UTC) |
| 227 | |
| 228 | if expires_at < now: |
| 229 | logger.info(f'API key has expired: {key_record.id}') |
| 230 | return None |
| 231 | |
| 232 | # Update last_used_at timestamp |
| 233 | await session.execute( |
| 234 | update(ApiKey) |
| 235 | .where(ApiKey.id == key_record.id) |
| 236 | .values(last_used_at=now.replace(tzinfo=None)) |
| 237 | ) |
| 238 | await session.commit() |
| 239 | |
| 240 | return ApiKeyValidationResult( |
| 241 | user_id=key_record.user_id, |
| 242 | org_id=key_record.org_id, |
| 243 | key_id=key_record.id, |
| 244 | key_name=key_record.name, |
| 245 | ) |
| 246 | |
| 247 | async def delete_api_key(self, api_key: str) -> bool: |
| 248 | """Delete an API key by the key value.""" |