(self, instrument, field, start_time=None, end_time=None, freq="day")
| 504 | return hash_args(instrument, field, freq) |
| 505 | |
| 506 | def _expression(self, instrument, field, start_time=None, end_time=None, freq="day"): |
| 507 | _cache_uri = self._uri(instrument=instrument, field=field, start_time=None, end_time=None, freq=freq) |
| 508 | _instrument_dir = self.get_cache_dir(freq).joinpath(instrument.lower()) |
| 509 | cache_path = _instrument_dir.joinpath(_cache_uri) |
| 510 | # get calendar |
| 511 | from .data import Cal # pylint: disable=C0415 |
| 512 | |
| 513 | _calendar = Cal.calendar(freq=freq) |
| 514 | |
| 515 | _, _, start_index, end_index = Cal.locate_index(start_time, end_time, freq, future=False) |
| 516 | |
| 517 | if self.check_cache_exists(cache_path, suffix_list=[".meta"]): |
| 518 | """ |
| 519 | In most cases, we do not need reader_lock. |
| 520 | Because updating data is a small probability event compare to reading data. |
| 521 | |
| 522 | """ |
| 523 | # FIXME: Removing the reader lock may result in conflicts. |
| 524 | # with CacheUtils.reader_lock(self.r, 'expression-%s' % _cache_uri): |
| 525 | |
| 526 | # modify expression cache meta file |
| 527 | try: |
| 528 | # FIXME: Multiple readers may result in error visit number |
| 529 | if not self.remote: |
| 530 | CacheUtils.visit(cache_path) |
| 531 | series = read_bin(cache_path, start_index, end_index) |
| 532 | return series |
| 533 | except Exception: |
| 534 | series = None |
| 535 | self.logger.error("reading %s file error : %s" % (cache_path, traceback.format_exc())) |
| 536 | return series |
| 537 | else: |
| 538 | # normalize field |
| 539 | field = remove_fields_space(field) |
| 540 | # cache unavailable, generate the cache |
| 541 | _instrument_dir.mkdir(parents=True, exist_ok=True) |
| 542 | if not isinstance(eval(parse_field(field)), Feature): |
| 543 | # When the expression is not a raw feature |
| 544 | # generate expression cache if the feature is not a Feature |
| 545 | # instance |
| 546 | series = self.provider.expression(instrument, field, _calendar[0], _calendar[-1], freq) |
| 547 | if not series.empty: |
| 548 | # This expression is empty, we don't generate any cache for it. |
| 549 | with CacheUtils.writer_lock(self.r, f"{str(C.dpm.get_data_uri(freq))}:expression-{_cache_uri}"): |
| 550 | self.gen_expression_cache( |
| 551 | expression_data=series, |
| 552 | cache_path=cache_path, |
| 553 | instrument=instrument, |
| 554 | field=field, |
| 555 | freq=freq, |
| 556 | last_update=str(_calendar[-1]), |
| 557 | ) |
| 558 | return series.loc[start_index:end_index] |
| 559 | else: |
| 560 | return series |
| 561 | else: |
| 562 | # If the expression is a raw feature(such as $close, $open) |
| 563 | return self.provider.expression(instrument, field, start_time, end_time, freq) |
nothing calls this directly
no test coverage detected