| 1233 | |
| 1234 | |
| 1235 | class LoggedIO: |
| 1236 | class SegmentFull(Exception): |
| 1237 | """raised when a segment is full, before opening next""" |
| 1238 | |
| 1239 | header_fmt = struct.Struct("<IIB") |
| 1240 | assert header_fmt.size == 9 |
| 1241 | header_no_crc_fmt = struct.Struct("<IB") |
| 1242 | assert header_no_crc_fmt.size == 5 |
| 1243 | crc_fmt = struct.Struct("<I") |
| 1244 | assert crc_fmt.size == 4 |
| 1245 | |
| 1246 | _commit = header_no_crc_fmt.pack(9, TAG_COMMIT) |
| 1247 | COMMIT = crc_fmt.pack(crc32(_commit)) + _commit |
| 1248 | |
| 1249 | HEADER_ID_SIZE = header_fmt.size + 32 |
| 1250 | ENTRY_HASH_SIZE = 8 |
| 1251 | |
| 1252 | def __init__(self, path, limit, segments_per_dir, capacity=90): |
| 1253 | self.path = path |
| 1254 | self.fds = LRUCache(capacity, dispose=self._close_fd) |
| 1255 | self.segment = 0 |
| 1256 | self.limit = limit |
| 1257 | self.segments_per_dir = segments_per_dir |
| 1258 | self.offset = 0 |
| 1259 | self._write_fd = None |
| 1260 | self._fds_cleaned = 0 |
| 1261 | |
| 1262 | def close(self): |
| 1263 | self.close_segment() |
| 1264 | self.fds.clear() |
| 1265 | self.fds = None # Just to make sure we're disabled |
| 1266 | |
| 1267 | def _close_fd(self, ts_fd): |
| 1268 | ts, fd = ts_fd |
| 1269 | safe_fadvise(fd.fileno(), 0, 0, "DONTNEED") |
| 1270 | fd.close() |
| 1271 | |
| 1272 | def get_segment_dirs(self, data_dir, start_index=MIN_SEGMENT_DIR_INDEX, end_index=MAX_SEGMENT_DIR_INDEX): |
| 1273 | """Returns generator yielding required segment dirs in data_dir as `os.DirEntry` objects. |
| 1274 | Start and end are inclusive. |
| 1275 | """ |
| 1276 | segment_dirs = ( |
| 1277 | f |
| 1278 | for f in os.scandir(data_dir) |
| 1279 | if f.is_dir() and f.name.isdigit() and start_index <= int(f.name) <= end_index |
| 1280 | ) |
| 1281 | return segment_dirs |
| 1282 | |
| 1283 | def get_segment_files(self, segment_dir, start_index=MIN_SEGMENT_INDEX, end_index=MAX_SEGMENT_INDEX): |
| 1284 | """Returns generator yielding required segment files in segment_dir as `os.DirEntry` objects. |
| 1285 | Start and end are inclusive. |
| 1286 | """ |
| 1287 | segment_files = ( |
| 1288 | f |
| 1289 | for f in os.scandir(segment_dir) |
| 1290 | if f.is_file() and f.name.isdigit() and start_index <= int(f.name) <= end_index |
| 1291 | ) |
| 1292 | return segment_files |