MCPcopy
hub / github.com/saltstack/salt / atomic_rebuild

Method atomic_rebuild

salt/utils/mmap_cache.py:1614–1850  ·  view source on GitHub ↗

Rebuild the cache from an iterator of ``(key, value)`` or ``(key,)`` tuples. Writes fresh temporary index, heap, and roster files, then atomically swaps all three. Roster is swapped last so readers never see a new roster pointing into an old index.

(self, iterator)

Source from the content-addressed store, hash-verified

1612 }
1613
1614 def atomic_rebuild(self, iterator):
1615 """
1616 Rebuild the cache from an iterator of ``(key, value)`` or ``(key,)``
1617 tuples.
1618
1619 Writes fresh temporary index, heap, and roster files, then atomically
1620 swaps all three. Roster is swapped last so readers never see a new
1621 roster pointing into an old index.
1622 """
1623 os.makedirs(os.path.dirname(self.path), exist_ok=True)
1624
1625 tmp_dir = os.path.dirname(self.path)
1626 tmp_idx_fd, tmp_idx_path = tempfile.mkstemp(dir=tmp_dir, prefix=".mmcache_idx_")
1627 # Rebuild always starts with a single segment; further rolls happen
1628 # naturally if max_segment_bytes is exceeded during the write loop.
1629 tmp_heap_base = tmp_idx_path + ".heap"
1630 tmp_roster_path = tmp_idx_path + ".roster"
1631 # Used in ``except`` cleanup; must exist even if we fail before the
1632 # inner loop assigns the working segment counter.
1633 tmp_seg_id = 0
1634
1635 try:
1636 with self._thread_lock:
1637 with self._lock():
1638 total_idx = self.size * self.slot_size
1639 chunk = 1024 * 1024
1640 zeros = b"\x00" * min(chunk, total_idx)
1641 with os.fdopen(tmp_idx_fd, "wb") as f:
1642 written = 0
1643 while written < total_idx:
1644 to_write = min(chunk, total_idx - written)
1645 f.write(zeros[:to_write])
1646 written += to_write
1647 f.flush()
1648 os.fsync(f.fileno())
1649 tmp_idx_fd = -1
1650
1651 # seg_id and seg_pos track the current temporary segment.
1652 tmp_seg_id = 0
1653 tmp_seg_pos = 0 # bytes written to current tmp segment
1654 tmp_heap_segs = [tmp_heap_base] # paths of tmp heap segments
1655 occupied_count = 0
1656 hwm = 0
1657 roster_entries = []
1658
1659 heap_f = salt.utils.files.fopen( # pylint: disable=resource-leakage
1660 tmp_heap_base, "wb"
1661 )
1662 try:
1663 with salt.utils.files.fopen(tmp_idx_path, "r+b") as idx_f:
1664 mm = mmap.mmap(idx_f.fileno(), 0, access=mmap.ACCESS_WRITE)
1665 try:
1666 mm[0] = _HEADER_MAGIC
1667
1668 for item in iterator:
1669 if (
1670 isinstance(item, (list, tuple))
1671 and len(item) > 1

Calls 15

_lockMethod · 0.95
_pack_offsetMethod · 0.95
_segment_pathMethod · 0.95
_close_mmaps_and_fdsMethod · 0.95
_fsync_fd_maybeFunction · 0.85
warningMethod · 0.80
writeMethod · 0.45
flushMethod · 0.45
filenoMethod · 0.45
packMethod · 0.45
closeMethod · 0.45
appendMethod · 0.45