Load and process the data, return the data set. - default using multi-kernel method.
(instruments_d, column_names, start_time, end_time, freq, inst_processors=[])
| 546 | |
| 547 | @staticmethod |
| 548 | def dataset_processor(instruments_d, column_names, start_time, end_time, freq, inst_processors=[]): |
| 549 | """ |
| 550 | Load and process the data, return the data set. |
| 551 | - default using multi-kernel method. |
| 552 | |
| 553 | """ |
| 554 | normalize_column_names = normalize_cache_fields(column_names) |
| 555 | # One process for one task, so that the memory will be freed quicker. |
| 556 | workers = max(min(C.get_kernels(freq), len(instruments_d)), 1) |
| 557 | |
| 558 | # create iterator |
| 559 | if isinstance(instruments_d, dict): |
| 560 | it = instruments_d.items() |
| 561 | else: |
| 562 | it = zip(instruments_d, [None] * len(instruments_d)) |
| 563 | |
| 564 | inst_l = [] |
| 565 | task_l = [] |
| 566 | for inst, spans in it: |
| 567 | inst_l.append(inst) |
| 568 | task_l.append( |
| 569 | delayed(DatasetProvider.inst_calculator)( |
| 570 | inst, start_time, end_time, freq, normalize_column_names, spans, C, inst_processors |
| 571 | ) |
| 572 | ) |
| 573 | |
| 574 | data = dict( |
| 575 | zip( |
| 576 | inst_l, |
| 577 | ParallelExt(n_jobs=workers, backend=C.joblib_backend, maxtasksperchild=C.maxtasksperchild)(task_l), |
| 578 | ) |
| 579 | ) |
| 580 | |
| 581 | new_data = dict() |
| 582 | for inst in sorted(data.keys()): |
| 583 | if len(data[inst]) > 0: |
| 584 | # NOTE: Python version >= 3.6; in versions after python3.6, dict will always guarantee the insertion order |
| 585 | new_data[inst] = data[inst] |
| 586 | |
| 587 | if len(new_data) > 0: |
| 588 | data = pd.concat(new_data, names=["instrument"], sort=False) |
| 589 | data = DiskDatasetCache.cache_to_origin_data(data, column_names) |
| 590 | else: |
| 591 | data = pd.DataFrame( |
| 592 | index=pd.MultiIndex.from_arrays([[], []], names=("instrument", "datetime")), |
| 593 | columns=column_names, |
| 594 | dtype=np.float32, |
| 595 | ) |
| 596 | |
| 597 | return data |
| 598 | |
| 599 | @staticmethod |
| 600 | def inst_calculator(inst, start_time, end_time, freq, column_names, spans=None, g_config=None, inst_processors=[]): |
no test coverage detected