(
connection: sqlite3.Connection, args: argparse.Namespace
)
| 1937 | |
| 1938 | |
| 1939 | def release_handoff_delivery( |
| 1940 | connection: sqlite3.Connection, args: argparse.Namespace |
| 1941 | ) -> dict[str, Any]: |
| 1942 | scan_id = require_uuid(args.scan_id, "scan-id") |
| 1943 | claim_token = require_handoff_claim_token(args.claim_token) |
| 1944 | timestamp = now() |
| 1945 | with connection: |
| 1946 | scan = require_scan(connection, scan_id) |
| 1947 | connection.execute( |
| 1948 | """ |
| 1949 | UPDATE scans |
| 1950 | SET handoff_claimed_at = NULL, handoff_claim_token = NULL, updated_at = ? |
| 1951 | WHERE id = ? AND handoff_status = 'pending' |
| 1952 | AND handoff_claim_token = ? |
| 1953 | """, |
| 1954 | (timestamp, scan["id"], claim_token), |
| 1955 | ) |
| 1956 | return workspace_state(connection, scan["workspace_id"]) |
| 1957 | |
| 1958 | |
| 1959 | def mark_handoff_delivered( |
no test coverage detected