Iterates through all archive items Missing item chunks will be skipped and the msgpack stream will be restarted
(archive)
| 2036 | ) |
| 2037 | |
| 2038 | def robust_iterator(archive): |
| 2039 | """Iterates through all archive items |
| 2040 | |
| 2041 | Missing item chunks will be skipped and the msgpack stream will be restarted |
| 2042 | """ |
| 2043 | item_keys = self.manifest.item_keys |
| 2044 | required_item_keys = REQUIRED_ITEM_KEYS |
| 2045 | unpacker = RobustUnpacker( |
| 2046 | lambda item: isinstance(item, StableDict) and "path" in item, self.manifest.item_keys |
| 2047 | ) |
| 2048 | _state = 0 |
| 2049 | |
| 2050 | def missing_chunk_detector(chunk_id): |
| 2051 | nonlocal _state |
| 2052 | if _state % 2 != int(chunk_id not in self.chunks): |
| 2053 | _state += 1 |
| 2054 | return _state |
| 2055 | |
| 2056 | def report(msg, chunk_id, chunk_no): |
| 2057 | cid = bin_to_hex(chunk_id) |
| 2058 | msg += " [chunk: %06d_%s]" % (chunk_no, cid) # see "debug dump-archive-items" |
| 2059 | self.error_found = True |
| 2060 | logger.error(msg) |
| 2061 | |
| 2062 | def list_keys_safe(keys): |
| 2063 | return ", ".join(k.decode(errors="replace") if isinstance(k, bytes) else str(k) for k in keys) |
| 2064 | |
| 2065 | def valid_item(obj): |
| 2066 | if not isinstance(obj, StableDict): |
| 2067 | return False, "not a dictionary" |
| 2068 | keys = set(obj) |
| 2069 | if not required_item_keys.issubset(keys): |
| 2070 | return False, "missing required keys: " + list_keys_safe(required_item_keys - keys) |
| 2071 | if not keys.issubset(item_keys): |
| 2072 | return False, "invalid keys: " + list_keys_safe(keys - item_keys) |
| 2073 | return True, "" |
| 2074 | |
| 2075 | i = 0 |
| 2076 | archive_items = archive_get_items(archive, repo_objs=self.repo_objs, repository=repository) |
| 2077 | for state, items in groupby(archive_items, missing_chunk_detector): |
| 2078 | items = list(items) |
| 2079 | if state % 2: |
| 2080 | for chunk_id in items: |
| 2081 | report("item metadata chunk missing", chunk_id, i) |
| 2082 | i += 1 |
| 2083 | continue |
| 2084 | if state > 0: |
| 2085 | unpacker.resync() |
| 2086 | for chunk_id, cdata in zip(items, repository.get_many(items)): |
| 2087 | try: |
| 2088 | _, data = self.repo_objs.parse(chunk_id, cdata, ro_type=ROBJ_ARCHIVE_STREAM) |
| 2089 | unpacker.feed(data) |
| 2090 | for item in unpacker: |
| 2091 | valid, reason = valid_item(item) |
| 2092 | if valid: |
| 2093 | yield Item(internal_dict=item) |
| 2094 | else: |
| 2095 | report( |
nothing calls this directly
no test coverage detected