| 354 | |
| 355 | |
| 356 | class DumpDataFix(DumpDataAll): |
| 357 | def _dump_instruments(self): |
| 358 | logger.info("start dump instruments......") |
| 359 | _fun = partial(self._get_date, is_begin_end=True) |
| 360 | new_stock_files = sorted( |
| 361 | filter( |
| 362 | lambda x: self.get_symbol_from_file(x).upper() not in self._old_instruments, |
| 363 | self.df_files, |
| 364 | ) |
| 365 | ) |
| 366 | with tqdm(total=len(new_stock_files)) as p_bar: |
| 367 | with ProcessPoolExecutor(max_workers=self.works) as execute: |
| 368 | for file_path, (_begin_time, _end_time) in zip(new_stock_files, execute.map(_fun, new_stock_files)): |
| 369 | if isinstance(_begin_time, pd.Timestamp) and isinstance(_end_time, pd.Timestamp): |
| 370 | symbol = self.get_symbol_from_file(file_path).upper() |
| 371 | _dt_map = self._old_instruments.setdefault(symbol, dict()) |
| 372 | _dt_map[self.INSTRUMENTS_START_FIELD] = self._format_datetime(_begin_time) |
| 373 | _dt_map[self.INSTRUMENTS_END_FIELD] = self._format_datetime(_end_time) |
| 374 | p_bar.update() |
| 375 | _inst_df = pd.DataFrame.from_dict(self._old_instruments, orient="index") |
| 376 | _inst_df.index.names = [self.symbol_field_name] |
| 377 | self.save_instruments(_inst_df.reset_index()) |
| 378 | logger.info("end of instruments dump.\n") |
| 379 | |
| 380 | def dump(self): |
| 381 | self._calendars_list = self._read_calendars(self._calendars_dir.joinpath(f"{self.freq}.txt")) |
| 382 | # noinspection PyAttributeOutsideInit |
| 383 | self._old_instruments = ( |
| 384 | self._read_instruments(self._instruments_dir.joinpath(self.INSTRUMENTS_FILE_NAME)) |
| 385 | .set_index([self.symbol_field_name]) |
| 386 | .to_dict(orient="index") |
| 387 | ) # type: dict |
| 388 | self._dump_instruments() |
| 389 | self._dump_features() |
| 390 | |
| 391 | |
| 392 | class DumpDataUpdate(DumpDataBase): |
no outgoing calls