(connection: sqlite3.Connection, scan_id: str)
| 1744 | |
| 1745 | |
| 1746 | def complete_scan_locked(connection: sqlite3.Connection, scan_id: str) -> dict[str, Any]: |
| 1747 | scan = require_scan(connection, scan_id) |
| 1748 | if scan["status"] == "complete": |
| 1749 | scan_dir = require_canonical_scan_directory(Path(scan["scan_dir"])) |
| 1750 | require_recorded_manifest_digest(scan, scan_dir) |
| 1751 | verify_manifest_binding(scan, read_json_object(scan_dir / ARTIFACTS["manifest"])) |
| 1752 | try: |
| 1753 | manifest, _, _ = finalize_scan( |
| 1754 | scan_dir, |
| 1755 | expected_coverage_mode=expected_coverage_mode(scan), |
| 1756 | ) |
| 1757 | except ContractError as exc: |
| 1758 | raise SystemExit(str(exc)) from exc |
| 1759 | verify_manifest_binding(scan, manifest) |
| 1760 | manifest_digest = published_manifest_digest(scan_dir, manifest) |
| 1761 | pin_legacy_manifest_digest(connection, scan["id"], manifest_digest) |
| 1762 | return scan_context(connection, scan["id"]) |
| 1763 | if scan["status"] != "running": |
| 1764 | raise SystemExit("Only a running scan can be completed.") |
| 1765 | require_unchanged_target(scan) |
| 1766 | scan_dir = require_canonical_scan_directory(Path(scan["scan_dir"])) |
| 1767 | manifest = read_json_object(scan_dir / ARTIFACTS["manifest"]) |
| 1768 | verify_manifest_binding(scan, manifest) |
| 1769 | try: |
| 1770 | manifest, findings, _ = finalize_scan( |
| 1771 | scan_dir, |
| 1772 | expected_coverage_mode=expected_coverage_mode(scan), |
| 1773 | ) |
| 1774 | except ContractError as exc: |
| 1775 | raise SystemExit(str(exc)) from exc |
| 1776 | artifacts = { |
| 1777 | kind: artifact_path(scan_dir, filename, required=True) |
| 1778 | for kind, filename in ARTIFACTS.items() |
| 1779 | } |
| 1780 | verify_manifest_binding(scan, manifest) |
| 1781 | manifest_digest = published_manifest_digest(scan_dir, manifest) |
| 1782 | require_unchanged_target(scan) |
| 1783 | connection.execute("BEGIN IMMEDIATE") |
| 1784 | try: |
| 1785 | timestamp = now() |
| 1786 | scan = require_scan(connection, scan["id"]) |
| 1787 | if scan["status"] == "complete": |
| 1788 | connection.commit() |
| 1789 | return scan_context(connection, scan["id"]) |
| 1790 | if scan["status"] != "running": |
| 1791 | raise SystemExit("Only a running scan can be completed.") |
| 1792 | connection.execute("DELETE FROM scan_artifacts WHERE scan_id = ?", (scan["id"],)) |
| 1793 | for kind, path in artifacts.items(): |
| 1794 | if path is not None: |
| 1795 | connection.execute( |
| 1796 | "INSERT INTO scan_artifacts (scan_id, kind, path, created_at) VALUES (?, ?, ?, ?)", |
| 1797 | (scan["id"], kind, str(path), timestamp), |
| 1798 | ) |
| 1799 | connection.execute("DELETE FROM finding_occurrences WHERE scan_id = ?", (scan["id"],)) |
| 1800 | index_findings(connection, scan["id"], findings, timestamp) |
| 1801 | finding_count = len(findings.get("findings", [])) |
| 1802 | connection.execute( |
| 1803 | """ |
no test coverage detected