| 231 | |
| 232 | |
| 233 | class LogReader: |
| 234 | def _parse_identifier(self, identifier: str) -> list[str]: |
| 235 | # useradmin, etc. |
| 236 | identifier = parse_indirect(identifier) |
| 237 | |
| 238 | # direct url or file |
| 239 | direct_parsed = parse_direct(identifier) |
| 240 | if direct_parsed is not None: |
| 241 | return direct_source(identifier) |
| 242 | |
| 243 | identifiers = auto_source(identifier, self.sources, self.default_mode) |
| 244 | return identifiers |
| 245 | |
| 246 | def __init__(self, identifier: str | list[str], default_mode: ReadMode = ReadMode.RLOG, |
| 247 | sources: list[Source] | None = None, sort_by_time=False, only_union_types=False): |
| 248 | if sources is None: |
| 249 | sources = [internal_source, comma_api_source, openpilotci_source, comma_car_segments_source] |
| 250 | |
| 251 | self.default_mode = default_mode |
| 252 | self.sources = sources |
| 253 | self.identifier = identifier |
| 254 | if isinstance(identifier, str): |
| 255 | self.identifier = [identifier] |
| 256 | |
| 257 | self.sort_by_time = sort_by_time |
| 258 | self.only_union_types = only_union_types |
| 259 | |
| 260 | self.__lrs: dict[int, _LogFileReader] = {} |
| 261 | self.reset() |
| 262 | |
| 263 | def _get_lr(self, i): |
| 264 | if i not in self.__lrs: |
| 265 | self.__lrs[i] = _LogFileReader(self.logreader_identifiers[i], sort_by_time=self.sort_by_time, only_union_types=self.only_union_types) |
| 266 | return self.__lrs[i] |
| 267 | |
| 268 | def __iter__(self): |
| 269 | for i in range(len(self.logreader_identifiers)): |
| 270 | yield from self._get_lr(i) |
| 271 | |
| 272 | def _run_on_segment(self, func, i): |
| 273 | return func(self._get_lr(i)) |
| 274 | |
| 275 | def run_across_segments(self, num_processes, func, disable_tqdm=False, desc=None): |
| 276 | with multiprocessing.Pool(num_processes) as pool: |
| 277 | ret = [] |
| 278 | num_segs = len(self.logreader_identifiers) |
| 279 | for p in tqdm.tqdm(pool.imap(partial(self._run_on_segment, func), range(num_segs)), total=num_segs, disable=disable_tqdm, desc=desc): |
| 280 | ret.extend(p) |
| 281 | return ret |
| 282 | |
| 283 | def reset(self): |
| 284 | self.logreader_identifiers = [] |
| 285 | for identifier in self.identifier: |
| 286 | self.logreader_identifiers.extend(self._parse_identifier(identifier)) |
| 287 | |
| 288 | @staticmethod |
| 289 | def from_bytes(dat): |
| 290 | return _LogFileReader("", dat=dat) |
no outgoing calls