(self, cache_uri, freq: str = "day")
| 949 | return features.swaplevel("datetime", "instrument") |
| 950 | |
| 951 | def update(self, cache_uri, freq: str = "day"): |
| 952 | cp_cache_uri = self.get_cache_dir(freq).joinpath(cache_uri) |
| 953 | meta_path = cp_cache_uri.with_suffix(".meta") |
| 954 | if not self.check_cache_exists(cp_cache_uri): |
| 955 | self.logger.info(f"The cache {cp_cache_uri} has corrupted. It will be removed") |
| 956 | self.clear_cache(cp_cache_uri) |
| 957 | return 2 |
| 958 | |
| 959 | im = DiskDatasetCache.IndexManager(cp_cache_uri) |
| 960 | with CacheUtils.writer_lock(self.r, f"{str(C.dpm.get_data_uri())}:dataset-{cache_uri}"): |
| 961 | with meta_path.open("rb") as f: |
| 962 | d = pickle.load(f) |
| 963 | instruments = d["info"]["instruments"] |
| 964 | fields = d["info"]["fields"] |
| 965 | freq = d["info"]["freq"] |
| 966 | last_update_time = d["info"]["last_update"] |
| 967 | inst_processors = d["info"].get("inst_processors", []) |
| 968 | index_data = im.get_index() |
| 969 | |
| 970 | self.logger.debug("Updating dataset: {}".format(d)) |
| 971 | from .data import Inst # pylint: disable=C0415 |
| 972 | |
| 973 | if Inst.get_inst_type(instruments) == Inst.DICT: |
| 974 | self.logger.info(f"The file {cache_uri} has dict cache. Skip updating") |
| 975 | return 1 |
| 976 | |
| 977 | # get newest calendar |
| 978 | from .data import Cal # pylint: disable=C0415 |
| 979 | |
| 980 | whole_calendar = Cal.calendar(start_time=None, end_time=None, freq=freq) |
| 981 | # The calendar since last updated |
| 982 | new_calendar = Cal.calendar(start_time=last_update_time, end_time=None, freq=freq) |
| 983 | |
| 984 | # get append data |
| 985 | if len(new_calendar) <= 1: |
| 986 | # Including last updated calendar, we only get 1 item. |
| 987 | # No future updating is needed. |
| 988 | return 1 |
| 989 | else: |
| 990 | # get the data needed after the historical data are removed. |
| 991 | # The start index of new data |
| 992 | current_index = len(whole_calendar) - len(new_calendar) + 1 |
| 993 | |
| 994 | # To avoid recursive import |
| 995 | from .data import ExpressionD # pylint: disable=C0415 |
| 996 | |
| 997 | # The existing data length |
| 998 | lft_etd = rght_etd = 0 |
| 999 | for field in fields: |
| 1000 | expr = ExpressionD.get_expression_instance(field) |
| 1001 | l, r = expr.get_extended_window_size() |
| 1002 | lft_etd = max(lft_etd, l) |
| 1003 | rght_etd = max(rght_etd, r) |
| 1004 | # remove the period that should be updated. |
| 1005 | if index_data.empty: |
| 1006 | # We don't have any data for such dataset. Nothing to remove |
| 1007 | rm_n_period = rm_lines = 0 |
| 1008 | else: |
nothing calls this directly
no test coverage detected