MCPcopy
hub / github.com/cft0808/edict / atomic_json_update

Function atomic_json_update

scripts/file_lock.py:79–115  ·  view source on GitHub ↗

原子地读取 → 修改 → 写回 JSON 文件。 modifier(data) 应返回修改后的数据。 使用临时文件 + rename 保证写入原子性。

(
    path: pathlib.Path,
    modifier: Callable[[Any], Any],
    default: Any = None,
)

Source from the content-addressed store, hash-verified

77
78
79def atomic_json_update(
80 path: pathlib.Path,
81 modifier: Callable[[Any], Any],
82 default: Any = None,
83) -> Any:
84 """
85 原子地读取 → 修改 → 写回 JSON 文件。
86 modifier(data) 应返回修改后的数据。
87 使用临时文件 + rename 保证写入原子性。
88 """
89 lock_file = _lock_path(path)
90 lock_file.parent.mkdir(parents=True, exist_ok=True)
91 fd = os.open(str(lock_file), os.O_CREAT | os.O_RDWR)
92 try:
93 _lock_exclusive(fd)
94 # Read
95 try:
96 data = json.loads(path.read_text(encoding='utf-8')) if path.exists() else default
97 except Exception:
98 data = default
99 # Modify
100 result = modifier(data)
101 # Atomic write via temp file + rename
102 tmp_fd, tmp_path = tempfile.mkstemp(
103 dir=str(path.parent), suffix='.tmp', prefix=path.stem + '_'
104 )
105 try:
106 with os.fdopen(tmp_fd, 'w', encoding='utf-8') as f:
107 json.dump(result, f, ensure_ascii=False, indent=2)
108 os.replace(tmp_path, str(path))
109 except Exception:
110 os.unlink(tmp_path)
111 raise
112 return result
113 finally:
114 _unlock(fd)
115 os.close(fd)
116
117
118def atomic_json_write(path: pathlib.Path, data: Any) -> None:

Callers 15

_append_auditFunction · 0.90
cmd_createFunction · 0.90
cmd_stateFunction · 0.90
cmd_flowFunction · 0.90
cmd_doneFunction · 0.90
cmd_blockFunction · 0.90
cmd_confirmFunction · 0.90
cmd_progressFunction · 0.90
cmd_todoFunction · 0.90
cmd_memoryFunction · 0.90
cmd_task_memoFunction · 0.90
cmd_shared_memoFunction · 0.90

Calls 5

_lock_pathFunction · 0.85
_lock_exclusiveFunction · 0.85
modifierFunction · 0.85
_unlockFunction · 0.85
closeMethod · 0.80

Tested by 2

test_update_creates_fileFunction · 0.72