MCPcopy
hub / github.com/commaai/openpilot / atomic_write

Function atomic_write

common/utils.py:103–114  ·  view source on GitHub ↗

Write to a file atomically using a temporary file in the same directory as the destination file.

(path: str, mode: str = 'w', buffering: int = -1, encoding: str | None = None, newline: str | None = None,
                 overwrite: bool = False)

Source from the content-addressed store, hash-verified

101
102@contextlib.contextmanager
103def atomic_write(path: str, mode: str = 'w', buffering: int = -1, encoding: str | None = None, newline: str | None = None,
104 overwrite: bool = False):
105 """Write to a file atomically using a temporary file in the same directory as the destination file."""
106 dir_name = os.path.dirname(path)
107
108 if not overwrite and os.path.exists(path):
109 raise FileExistsError(f"File '{path}' already exists. To overwrite it, set 'overwrite' to True.")
110
111 with tempfile.NamedTemporaryFile(mode=mode, buffering=buffering, encoding=encoding, newline=newline, dir=dir_name, delete=False) as tmp_file:
112 yield tmp_file
113 tmp_file_name = tmp_file.name
114 os.replace(tmp_file_name, path)
115
116
117def get_upload_stream(filepath: str, should_compress: bool) -> tuple[io.BufferedIOBase, int]:

Callers 5

prune_cacheFunction · 0.90
get_lengthMethod · 0.90
readMethod · 0.90
mainFunction · 0.90
manager_threadFunction · 0.90

Calls

no outgoing calls

Tested by

no test coverage detected