(
self, instruments, fields, start_time=None, end_time=None, freq="day", disk_cache=0, inst_processors=[]
)
| 693 | return df |
| 694 | |
| 695 | def _dataset( |
| 696 | self, instruments, fields, start_time=None, end_time=None, freq="day", disk_cache=0, inst_processors=[] |
| 697 | ): |
| 698 | if disk_cache == 0: |
| 699 | # In this case, data_set cache is configured but will not be used. |
| 700 | return self.provider.dataset( |
| 701 | instruments, fields, start_time, end_time, freq, inst_processors=inst_processors |
| 702 | ) |
| 703 | # FIXME: The cache after resample, when read again and intercepted with end_time, results in incomplete data date |
| 704 | if inst_processors: |
| 705 | raise ValueError( |
| 706 | f"{self.__class__.__name__} does not support inst_processor. " |
| 707 | f"Please use `D.features(disk_cache=0)` or `qlib.init(dataset_cache=None)`" |
| 708 | ) |
| 709 | _cache_uri = self._uri( |
| 710 | instruments=instruments, |
| 711 | fields=fields, |
| 712 | start_time=None, |
| 713 | end_time=None, |
| 714 | freq=freq, |
| 715 | disk_cache=disk_cache, |
| 716 | inst_processors=inst_processors, |
| 717 | ) |
| 718 | |
| 719 | cache_path = self.get_cache_dir(freq).joinpath(_cache_uri) |
| 720 | |
| 721 | features = pd.DataFrame() |
| 722 | gen_flag = False |
| 723 | |
| 724 | if self.check_cache_exists(cache_path): |
| 725 | if disk_cache == 1: |
| 726 | # use cache |
| 727 | with CacheUtils.reader_lock(self.r, f"{str(C.dpm.get_data_uri(freq))}:dataset-{_cache_uri}"): |
| 728 | CacheUtils.visit(cache_path) |
| 729 | features = self.read_data_from_cache(cache_path, start_time, end_time, fields) |
| 730 | elif disk_cache == 2: |
| 731 | gen_flag = True |
| 732 | else: |
| 733 | gen_flag = True |
| 734 | |
| 735 | if gen_flag: |
| 736 | # cache unavailable, generate the cache |
| 737 | with CacheUtils.writer_lock(self.r, f"{str(C.dpm.get_data_uri(freq))}:dataset-{_cache_uri}"): |
| 738 | features = self.gen_dataset_cache( |
| 739 | cache_path=cache_path, |
| 740 | instruments=instruments, |
| 741 | fields=fields, |
| 742 | freq=freq, |
| 743 | inst_processors=inst_processors, |
| 744 | ) |
| 745 | if not features.empty: |
| 746 | features = features.sort_index().loc(axis=0)[:, start_time:end_time] |
| 747 | return features |
| 748 | |
| 749 | def _dataset_uri( |
| 750 | self, instruments, fields, start_time=None, end_time=None, freq="day", disk_cache=0, inst_processors=[] |
nothing calls this directly
no test coverage detected