(connection: sqlite3.Connection, args: argparse.Namespace)
| 1996 | |
| 1997 | |
| 1998 | def set_finding_triage(connection: sqlite3.Connection, args: argparse.Namespace) -> dict[str, Any]: |
| 1999 | close_reason = args.close_reason |
| 2000 | if args.status == "open" and close_reason is not None: |
| 2001 | raise SystemExit("An open finding cannot keep a close reason.") |
| 2002 | if args.status == "closed" and close_reason is None: |
| 2003 | raise SystemExit("Choose why this finding is being closed.") |
| 2004 | note = optional_text(args.note, maximum=2400) |
| 2005 | if close_reason == "wont_fix" and note is None: |
| 2006 | raise SystemExit("Explain why this finding will not be fixed.") |
| 2007 | connection.execute("BEGIN IMMEDIATE") |
| 2008 | try: |
| 2009 | timestamp = now() |
| 2010 | occurrence = require_occurrence(connection, args.occurrence_id) |
| 2011 | if args.status == "closed": |
| 2012 | remediation = connection.execute( |
| 2013 | """ |
| 2014 | SELECT * |
| 2015 | FROM finding_remediation_attempts |
| 2016 | WHERE occurrence_id = ? |
| 2017 | ORDER BY created_at DESC, rowid DESC |
| 2018 | LIMIT 1 |
| 2019 | """, |
| 2020 | (occurrence["id"],), |
| 2021 | ).fetchone() |
| 2022 | if remediation is not None and remediation["pending_action"] is not None: |
| 2023 | raise SystemExit( |
| 2024 | "Wait for the pending remediation operation to finish before closing this finding." |
| 2025 | ) |
| 2026 | if ( |
| 2027 | close_reason == "already_fixed" |
| 2028 | and remediation is not None |
| 2029 | and remediation["state"] == "verified" |
| 2030 | ): |
| 2031 | scan = require_scan(connection, occurrence["scan_id"]) |
| 2032 | require_remediation_checkout_unchanged( |
| 2033 | scan, |
| 2034 | remediation, |
| 2035 | require_applied_content=True, |
| 2036 | ) |
| 2037 | connection.execute( |
| 2038 | """ |
| 2039 | INSERT INTO finding_triage (occurrence_id, status, close_reason, note, updated_at) |
| 2040 | VALUES (?, ?, ?, ?, ?) |
| 2041 | ON CONFLICT(occurrence_id) DO UPDATE SET |
| 2042 | status = excluded.status, |
| 2043 | close_reason = excluded.close_reason, |
| 2044 | note = excluded.note, |
| 2045 | updated_at = excluded.updated_at |
| 2046 | """, |
| 2047 | (occurrence["id"], args.status, close_reason, note, timestamp), |
| 2048 | ) |
| 2049 | connection.commit() |
| 2050 | except BaseException: |
| 2051 | connection.rollback() |
| 2052 | raise |
| 2053 | return scan_context(connection, occurrence["scan_id"]) |
| 2054 | |
| 2055 |
no test coverage detected