Generate and store a cryptographic challenge for an agent. The returned ChallengeRequest.nonce must be sent to the agent. The agent signs the nonce with its private key and calls /challenge/verify.
(
agent_id: str,
issued_by: str,
db: Annotated[AsyncSession, Depends(get_db)],
nonce: str | None = None,
ttl_seconds: int = Query(default=300, ge=60, le=3600),
)
| 283 | description="Step 4 of PKI: verifier requests a cryptographic challenge for an agent.", |
| 284 | ) |
| 285 | async def request_challenge( |
| 286 | agent_id: str, |
| 287 | issued_by: str, |
| 288 | db: Annotated[AsyncSession, Depends(get_db)], |
| 289 | nonce: str | None = None, |
| 290 | ttl_seconds: int = Query(default=300, ge=60, le=3600), |
| 291 | ) -> ChallengeRequest: |
| 292 | """ |
| 293 | Generate and store a cryptographic challenge for an agent. |
| 294 | |
| 295 | The returned ChallengeRequest.nonce must be sent to the agent. |
| 296 | The agent signs the nonce with its private key and calls /challenge/verify. |
| 297 | """ |
| 298 | svc = AgentIdentityService(db) |
| 299 | identity = await svc.get_by_id(agent_id) |
| 300 | if identity is None: |
| 301 | raise HTTPException(status_code=404, detail=f"AgentIdentity not found: {agent_id}") |
| 302 | return await svc.request_challenge(agent_id, issued_by, nonce=nonce, ttl_seconds=ttl_seconds) |
| 303 | |
| 304 | |
| 305 | @_identity.post( |
nothing calls this directly
no test coverage detected