原子写入 JSON 文件(持排他锁 + tmpfile rename)。 直接写入,不读取现有内容(避免 atomic_json_update 的多余读开销)。
(path: pathlib.Path, data: Any)
| 116 | |
| 117 | |
| 118 | def atomic_json_write(path: pathlib.Path, data: Any) -> None: |
| 119 | """原子写入 JSON 文件(持排他锁 + tmpfile rename)。 |
| 120 | 直接写入,不读取现有内容(避免 atomic_json_update 的多余读开销)。 |
| 121 | """ |
| 122 | lock_file = _lock_path(path) |
| 123 | lock_file.parent.mkdir(parents=True, exist_ok=True) |
| 124 | fd = os.open(str(lock_file), os.O_CREAT | os.O_RDWR) |
| 125 | try: |
| 126 | _lock_exclusive(fd) |
| 127 | tmp_fd, tmp_path = tempfile.mkstemp( |
| 128 | dir=str(path.parent), suffix='.tmp', prefix=path.stem + '_' |
| 129 | ) |
| 130 | try: |
| 131 | with os.fdopen(tmp_fd, 'w', encoding='utf-8') as f: |
| 132 | json.dump(data, f, ensure_ascii=False, indent=2) |
| 133 | os.replace(tmp_path, str(path)) |
| 134 | except Exception: |
| 135 | os.unlink(tmp_path) |
| 136 | raise |
| 137 | finally: |
| 138 | _unlock(fd) |
| 139 | os.close(fd) |