(connection: sqlite3.Connection, args: argparse.Namespace)
| 1455 | |
| 1456 | |
| 1457 | def start_scan(connection: sqlite3.Connection, args: argparse.Namespace) -> dict[str, Any]: |
| 1458 | workspace_id = require_uuid(args.workspace_id, "workspace-id") |
| 1459 | try: |
| 1460 | workspace = require_workspace(connection, workspace_id) |
| 1461 | if not workspace["submitted"] or not workspace["target_path"]: |
| 1462 | raise SystemExit("Save the Codex Security setup before starting the scan.") |
| 1463 | active = connection.execute( |
| 1464 | "SELECT * FROM scans WHERE workspace_id = ? AND status = 'running'", |
| 1465 | (workspace["id"],), |
| 1466 | ).fetchone() |
| 1467 | if active is not None: |
| 1468 | return workspace_state(connection, workspace["id"]) |
| 1469 | workspace_version = workspace["updated_at"] |
| 1470 | scan_id = str(uuid.uuid4()) |
| 1471 | timestamp = now() |
| 1472 | target = require_target(workspace["target_path"]) |
| 1473 | require_scannable_target(target) |
| 1474 | target_metadata = target.stat() |
| 1475 | scope = require_scope(workspace["default_scope"], workspace["default_mode"], target) |
| 1476 | diff_target = None |
| 1477 | if workspace["default_mode"] == "diff": |
| 1478 | diff_target = require_diff_target( |
| 1479 | target, |
| 1480 | workspace["diff_target_kind"], |
| 1481 | workspace["diff_base_revision"], |
| 1482 | workspace["diff_head_revision"], |
| 1483 | workspace["diff_content_digest"], |
| 1484 | ) |
| 1485 | root = ( |
| 1486 | Path(args.scan_root).expanduser().resolve() if args.scan_root else state_dir() / "scans" |
| 1487 | ) |
| 1488 | target_root = (root / safe_segment(target.name)).resolve() |
| 1489 | if target_root == target or target in target_root.parents: |
| 1490 | raise SystemExit("The scan artifact directory must be outside the selected target.") |
| 1491 | revision = diff_target["headRevision"] if diff_target else git_revision(target) |
| 1492 | target_snapshot_digest = None |
| 1493 | if diff_target is None: |
| 1494 | target_snapshot_digest = ( |
| 1495 | directory_content_digest(target) |
| 1496 | if revision == "unversioned" |
| 1497 | else worktree_content_digest(target) |
| 1498 | ) |
| 1499 | target_root.mkdir(parents=True, exist_ok=True) |
| 1500 | connection.execute("BEGIN IMMEDIATE") |
| 1501 | workspace = require_workspace(connection, workspace_id) |
| 1502 | active = connection.execute( |
| 1503 | "SELECT * FROM scans WHERE workspace_id = ? AND status = 'running'", |
| 1504 | (workspace["id"],), |
| 1505 | ).fetchone() |
| 1506 | if active is not None: |
| 1507 | connection.commit() |
| 1508 | return workspace_state(connection, workspace["id"]) |
| 1509 | if workspace["updated_at"] != workspace_version: |
| 1510 | raise SystemExit("Codex Security setup changed while the scan was starting. Try again.") |
| 1511 | current_target = require_remediation_target(str(target)) |
| 1512 | current_target_metadata = current_target.stat() |
| 1513 | if (current_target_metadata.st_dev, current_target_metadata.st_ino) != ( |
| 1514 | target_metadata.st_dev, |
no test coverage detected