持锁读取 JSON 文件。
(path: pathlib.Path, default: Any = None)
| 61 | |
| 62 | |
| 63 | def atomic_json_read(path: pathlib.Path, default: Any = None) -> Any: |
| 64 | """持锁读取 JSON 文件。""" |
| 65 | lock_file = _lock_path(path) |
| 66 | lock_file.parent.mkdir(parents=True, exist_ok=True) |
| 67 | fd = os.open(str(lock_file), os.O_CREAT | os.O_RDWR) |
| 68 | try: |
| 69 | _lock_shared(fd) |
| 70 | try: |
| 71 | return json.loads(path.read_text(encoding='utf-8')) if path.exists() else default |
| 72 | except Exception: |
| 73 | return default |
| 74 | finally: |
| 75 | _unlock(fd) |
| 76 | os.close(fd) |
| 77 | |
| 78 | |
| 79 | def atomic_json_update( |