(
value: str | None,
*,
checked_target_path: str | None,
checked_mode: str,
)
| 513 | |
| 514 | |
| 515 | def capability_preflight_json( |
| 516 | value: str | None, |
| 517 | *, |
| 518 | checked_target_path: str | None, |
| 519 | checked_mode: str, |
| 520 | ) -> str | None: |
| 521 | normalized = optional_text(value) |
| 522 | if normalized is None: |
| 523 | return None |
| 524 | if len(normalized.encode("utf-8")) > MAX_CAPABILITY_PREFLIGHT_INPUT_JSON_BYTES: |
| 525 | raise SystemExit( |
| 526 | "Capability preflight must be no larger than " |
| 527 | f"{MAX_CAPABILITY_PREFLIGHT_INPUT_JSON_BYTES} bytes." |
| 528 | ) |
| 529 | try: |
| 530 | payload = json.loads(normalized, parse_constant=reject_nonstandard_json_number) |
| 531 | except (json.JSONDecodeError, ValueError) as exc: |
| 532 | raise SystemExit("Capability preflight must be valid JSON.") from exc |
| 533 | if not isinstance(payload, dict): |
| 534 | raise SystemExit("Capability preflight must be a JSON object.") |
| 535 | _require_object_keys( |
| 536 | payload, |
| 537 | required={"issues", "profile", "status"}, |
| 538 | optional={"remediation"}, |
| 539 | label="Capability preflight", |
| 540 | ) |
| 541 | profile = _bounded_preflight_text(payload.get("profile"), 128, "profile") |
| 542 | status = payload.get("status") |
| 543 | if status not in {"ready", "blocked", "incomplete"}: |
| 544 | raise SystemExit("Capability preflight status is invalid.") |
| 545 | issues = payload.get("issues") |
| 546 | if not isinstance(issues, list) or len(issues) > 32: |
| 547 | raise SystemExit("Capability preflight issues must be an array of at most 32 objects.") |
| 548 | normalized_issues: list[dict[str, str]] = [] |
| 549 | for index, issue in enumerate(issues): |
| 550 | if not isinstance(issue, dict): |
| 551 | raise SystemExit("Capability preflight issues must be an array of at most 32 objects.") |
| 552 | label = f"Capability preflight issue {index + 1}" |
| 553 | _require_object_keys( |
| 554 | issue, |
| 555 | required={"capability", "reason", "severity", "status"}, |
| 556 | optional=set(), |
| 557 | label=label, |
| 558 | ) |
| 559 | severity = issue.get("severity") |
| 560 | issue_status = issue.get("status") |
| 561 | if severity not in {"block", "warn", "suggest"} or issue_status not in { |
| 562 | "fail", |
| 563 | "unknown", |
| 564 | }: |
| 565 | raise SystemExit(f"{label} has an invalid severity or status.") |
| 566 | normalized_issues.append( |
| 567 | { |
| 568 | "capability": _bounded_preflight_text( |
| 569 | issue.get("capability"), 128, f"issue {index + 1} capability" |
| 570 | ), |
| 571 | "reason": _bounded_preflight_text( |
| 572 | issue.get("reason"), 1200, f"issue {index + 1} reason" |
no test coverage detected