| 176 | raise ValueError("Invalid entry type in self.meta") |
| 177 | |
| 178 | def iter_archive_items(self, archive_item_ids, filter=None): |
| 179 | unpacker = msgpack.Unpacker() |
| 180 | |
| 181 | # Current offset in the metadata stream, which consists of all metadata chunks glued together |
| 182 | stream_offset = 0 |
| 183 | # Offset of the current chunk in the metadata stream |
| 184 | chunk_begin = 0 |
| 185 | # Length of the chunk preceding the current chunk |
| 186 | last_chunk_length = 0 |
| 187 | msgpacked_bytes = b"" |
| 188 | |
| 189 | write_offset = self.write_offset |
| 190 | meta = self.meta |
| 191 | pack_indirect_into = self.indirect_entry_struct.pack_into |
| 192 | |
| 193 | for key, (csize, data) in zip(archive_item_ids, self.decrypted_repository.get_many(archive_item_ids)): |
| 194 | # Store the chunk ID in the meta-array |
| 195 | if write_offset + 32 >= len(meta): |
| 196 | meta.extend(bytes(self.GROW_META_BY)) |
| 197 | meta[write_offset : write_offset + 32] = key |
| 198 | current_id_offset = write_offset |
| 199 | write_offset += 32 |
| 200 | |
| 201 | chunk_begin += last_chunk_length |
| 202 | last_chunk_length = len(data) |
| 203 | |
| 204 | unpacker.feed(data) |
| 205 | while True: |
| 206 | try: |
| 207 | item = unpacker.unpack() |
| 208 | need_more_data = False |
| 209 | except msgpack.OutOfData: |
| 210 | need_more_data = True |
| 211 | |
| 212 | start = stream_offset - chunk_begin |
| 213 | # tell() is not helpful for the need_more_data case, but we know it is the remainder |
| 214 | # of the data in that case. in the other case, tell() works as expected. |
| 215 | length = (len(data) - start) if need_more_data else (unpacker.tell() - stream_offset) |
| 216 | msgpacked_bytes += data[start : start + length] |
| 217 | stream_offset += length |
| 218 | |
| 219 | if need_more_data: |
| 220 | # Need more data, feed the next chunk |
| 221 | break |
| 222 | |
| 223 | item = Item(internal_dict=item) |
| 224 | if filter and not filter(item): |
| 225 | msgpacked_bytes = b"" |
| 226 | continue |
| 227 | |
| 228 | current_item = msgpacked_bytes |
| 229 | current_item_length = len(current_item) |
| 230 | current_spans_chunks = stream_offset - current_item_length < chunk_begin |
| 231 | msgpacked_bytes = b"" |
| 232 | |
| 233 | if write_offset + 9 >= len(meta): |
| 234 | meta.extend(bytes(self.GROW_META_BY)) |
| 235 | |