Rebuild the cache from an iterator of ``(key, value)`` or ``(key,)`` tuples. Writes fresh temporary index, heap, and roster files, then atomically swaps all three. Roster is swapped last so readers never see a new roster pointing into an old index.
(self, iterator)
| 1612 | } |
| 1613 | |
| 1614 | def atomic_rebuild(self, iterator): |
| 1615 | """ |
| 1616 | Rebuild the cache from an iterator of ``(key, value)`` or ``(key,)`` |
| 1617 | tuples. |
| 1618 | |
| 1619 | Writes fresh temporary index, heap, and roster files, then atomically |
| 1620 | swaps all three. Roster is swapped last so readers never see a new |
| 1621 | roster pointing into an old index. |
| 1622 | """ |
| 1623 | os.makedirs(os.path.dirname(self.path), exist_ok=True) |
| 1624 | |
| 1625 | tmp_dir = os.path.dirname(self.path) |
| 1626 | tmp_idx_fd, tmp_idx_path = tempfile.mkstemp(dir=tmp_dir, prefix=".mmcache_idx_") |
| 1627 | # Rebuild always starts with a single segment; further rolls happen |
| 1628 | # naturally if max_segment_bytes is exceeded during the write loop. |
| 1629 | tmp_heap_base = tmp_idx_path + ".heap" |
| 1630 | tmp_roster_path = tmp_idx_path + ".roster" |
| 1631 | # Used in ``except`` cleanup; must exist even if we fail before the |
| 1632 | # inner loop assigns the working segment counter. |
| 1633 | tmp_seg_id = 0 |
| 1634 | |
| 1635 | try: |
| 1636 | with self._thread_lock: |
| 1637 | with self._lock(): |
| 1638 | total_idx = self.size * self.slot_size |
| 1639 | chunk = 1024 * 1024 |
| 1640 | zeros = b"\x00" * min(chunk, total_idx) |
| 1641 | with os.fdopen(tmp_idx_fd, "wb") as f: |
| 1642 | written = 0 |
| 1643 | while written < total_idx: |
| 1644 | to_write = min(chunk, total_idx - written) |
| 1645 | f.write(zeros[:to_write]) |
| 1646 | written += to_write |
| 1647 | f.flush() |
| 1648 | os.fsync(f.fileno()) |
| 1649 | tmp_idx_fd = -1 |
| 1650 | |
| 1651 | # seg_id and seg_pos track the current temporary segment. |
| 1652 | tmp_seg_id = 0 |
| 1653 | tmp_seg_pos = 0 # bytes written to current tmp segment |
| 1654 | tmp_heap_segs = [tmp_heap_base] # paths of tmp heap segments |
| 1655 | occupied_count = 0 |
| 1656 | hwm = 0 |
| 1657 | roster_entries = [] |
| 1658 | |
| 1659 | heap_f = salt.utils.files.fopen( # pylint: disable=resource-leakage |
| 1660 | tmp_heap_base, "wb" |
| 1661 | ) |
| 1662 | try: |
| 1663 | with salt.utils.files.fopen(tmp_idx_path, "r+b") as idx_f: |
| 1664 | mm = mmap.mmap(idx_f.fileno(), 0, access=mmap.ACCESS_WRITE) |
| 1665 | try: |
| 1666 | mm[0] = _HEADER_MAGIC |
| 1667 | |
| 1668 | for item in iterator: |
| 1669 | if ( |
| 1670 | isinstance(item, (list, tuple)) |
| 1671 | and len(item) > 1 |