(self)
| 491 | pass |
| 492 | |
| 493 | def _dump_features(self): |
| 494 | logger.info("start dump features......") |
| 495 | error_code = {} |
| 496 | with ProcessPoolExecutor(max_workers=self.works) as executor: |
| 497 | futures = {} |
| 498 | for _code, _df in self._all_data.groupby(self.symbol_field_name, group_keys=False): |
| 499 | _code = fname_to_code(str(_code).lower()).upper() |
| 500 | _start, _end = self._get_date(_df, is_begin_end=True) |
| 501 | if not (isinstance(_start, pd.Timestamp) and isinstance(_end, pd.Timestamp)): |
| 502 | continue |
| 503 | if _code in self._update_instruments: |
| 504 | # exists stock, will append data |
| 505 | _update_calendars = ( |
| 506 | _df[_df[self.date_field_name] > self._update_instruments[_code][self.INSTRUMENTS_END_FIELD]][ |
| 507 | self.date_field_name |
| 508 | ] |
| 509 | .sort_values() |
| 510 | .to_list() |
| 511 | ) |
| 512 | if _update_calendars: |
| 513 | self._update_instruments[_code][self.INSTRUMENTS_END_FIELD] = self._format_datetime(_end) |
| 514 | futures[executor.submit(self._dump_bin, _df, _update_calendars)] = _code |
| 515 | else: |
| 516 | # new stock |
| 517 | _dt_range = self._update_instruments.setdefault(_code, dict()) |
| 518 | _dt_range[self.INSTRUMENTS_START_FIELD] = self._format_datetime(_start) |
| 519 | _dt_range[self.INSTRUMENTS_END_FIELD] = self._format_datetime(_end) |
| 520 | futures[executor.submit(self._dump_bin, _df, self._new_calendar_list)] = _code |
| 521 | |
| 522 | with tqdm(total=len(futures)) as p_bar: |
| 523 | for _future in as_completed(futures): |
| 524 | try: |
| 525 | _future.result() |
| 526 | except Exception: |
| 527 | error_code[futures[_future]] = traceback.format_exc() |
| 528 | p_bar.update() |
| 529 | logger.info(f"dump bin errors: {error_code}") |
| 530 | |
| 531 | logger.info("end of features dump.\n") |
| 532 | |
| 533 | def dump(self): |
| 534 | self.save_calendars(self._new_calendar_list) |
no test coverage detected