Get the AgentCertificate for an identity. Note: The certificate is issued at creation time. This endpoint reconstructs the certificate from the stored identity record and the Org CA signature.
(
agent_id: str,
db: Annotated[AsyncSession, Depends(get_db)],
)
| 199 | description="Retrieve the signed certificate for an agent.", |
| 200 | ) |
| 201 | async def get_certificate( |
| 202 | agent_id: str, |
| 203 | db: Annotated[AsyncSession, Depends(get_db)], |
| 204 | ) -> AgentCertificate: |
| 205 | """ |
| 206 | Get the AgentCertificate for an identity. |
| 207 | |
| 208 | Note: The certificate is issued at creation time. This endpoint reconstructs |
| 209 | the certificate from the stored identity record and the Org CA signature. |
| 210 | """ |
| 211 | svc = AgentIdentityService(db) |
| 212 | identity = await svc.get_by_id(agent_id) |
| 213 | if identity is None: |
| 214 | raise HTTPException(status_code=404, detail=f"AgentIdentity not found: {agent_id}") |
| 215 | |
| 216 | # Reconstruct certificate from identity |
| 217 | # In production, the certificate should be stored separately |
| 218 | issued_at = identity.issued_at |
| 219 | valid_until = identity.valid_until |
| 220 | ca_fingerprint = identity.org_ca_fingerprint |
| 221 | public_key = identity.public_key |
| 222 | |
| 223 | # Build certificate payload and sign |
| 224 | cert_payload = { |
| 225 | "certificate_id": identity.agent_id, |
| 226 | "agent_id": identity.agent_id, |
| 227 | "agent_name": identity.agent_name, |
| 228 | "org_id": identity.org_id, |
| 229 | "issued_by": identity.issued_by, |
| 230 | "public_key": public_key, |
| 231 | "ca_fingerprint": ca_fingerprint, |
| 232 | "issued_at": issued_at.isoformat() if issued_at else None, |
| 233 | "valid_until": valid_until.isoformat() if valid_until else None, |
| 234 | } |
| 235 | |
| 236 | # Sign certificate payload using the public crypto_utils API |
| 237 | from orgkernel.crypto_utils import sign_agent_certificate |
| 238 | from orgkernel.crypto_utils import _ensure_ca_keypair |
| 239 | from cryptography.hazmat.primitives import serialization |
| 240 | |
| 241 | ca_private_key, _ = _ensure_ca_keypair() |
| 242 | pem_bytes = ca_private_key.private_bytes( |
| 243 | encoding=serialization.Encoding.PEM, |
| 244 | format=serialization.PrivateFormat.PKCS8, |
| 245 | encryption_algorithm=serialization.NoEncryption(), |
| 246 | ) |
| 247 | ca_signature = sign_agent_certificate(pem_bytes.decode("utf-8"), cert_payload) |
| 248 | |
| 249 | return AgentCertificate( |
| 250 | certificate_id=identity.agent_id, |
| 251 | agent_id=identity.agent_id, |
| 252 | agent_name=identity.agent_name, |
| 253 | org_id=identity.org_id, |
| 254 | issued_by=identity.issued_by, |
| 255 | public_key=public_key, |
| 256 | ca_fingerprint=ca_fingerprint, |
| 257 | ca_signature=ca_signature, |
| 258 | issued_at=issued_at, |
nothing calls this directly
no test coverage detected