(
connection: sqlite3.Connection, scan_id: str, manifest_digest: str
)
| 1718 | |
| 1719 | |
| 1720 | def pin_legacy_manifest_digest( |
| 1721 | connection: sqlite3.Connection, scan_id: str, manifest_digest: str |
| 1722 | ) -> None: |
| 1723 | connection.execute("BEGIN IMMEDIATE") |
| 1724 | try: |
| 1725 | scan = require_scan(connection, scan_id) |
| 1726 | current = scan["seal_manifest_digest"] |
| 1727 | if current is not None and current != manifest_digest: |
| 1728 | raise SystemExit("The sealed scan manifest changed after completion.") |
| 1729 | if current is None: |
| 1730 | connection.execute( |
| 1731 | "UPDATE scans SET seal_manifest_digest = ? WHERE id = ?", |
| 1732 | (manifest_digest, scan["id"]), |
| 1733 | ) |
| 1734 | connection.commit() |
| 1735 | except BaseException: |
| 1736 | connection.rollback() |
| 1737 | raise |
| 1738 | |
| 1739 | |
| 1740 | def complete_scan(connection: sqlite3.Connection, args: argparse.Namespace) -> dict[str, Any]: |
no test coverage detected