(scan_id: str)
| 324 | |
| 325 | @contextmanager |
| 326 | def scan_completion_lock(scan_id: str) -> Any: |
| 327 | lock_dir = state_dir() / "completion-locks" |
| 328 | lock_dir.mkdir(parents=True, exist_ok=True) |
| 329 | lock_path = lock_dir / f"{require_uuid(scan_id, 'scan-id')}.lock" |
| 330 | descriptor = os.open( |
| 331 | lock_path, |
| 332 | os.O_RDWR | os.O_CREAT | getattr(os, "O_BINARY", 0), |
| 333 | 0o600, |
| 334 | ) |
| 335 | locked = False |
| 336 | try: |
| 337 | acquire_completion_file_lock(descriptor) |
| 338 | locked = True |
| 339 | yield |
| 340 | finally: |
| 341 | try: |
| 342 | if locked: |
| 343 | release_completion_file_lock(descriptor) |
| 344 | finally: |
| 345 | os.close(descriptor) |
| 346 | |
| 347 | |
| 348 | def is_file_lock_contention(error: OSError) -> bool: |
no test coverage detected