MCPcopy
hub / github.com/microsoft/qlib / dataset_processor

Method dataset_processor

qlib/data/data.py:548–597  ·  view source on GitHub ↗

Load and process the data, return the data set. - default using multi-kernel method.

(instruments_d, column_names, start_time, end_time, freq, inst_processors=[])

Source from the content-addressed store, hash-verified

546
547 @staticmethod
548 def dataset_processor(instruments_d, column_names, start_time, end_time, freq, inst_processors=[]):
549 """
550 Load and process the data, return the data set.
551 - default using multi-kernel method.
552
553 """
554 normalize_column_names = normalize_cache_fields(column_names)
555 # One process for one task, so that the memory will be freed quicker.
556 workers = max(min(C.get_kernels(freq), len(instruments_d)), 1)
557
558 # create iterator
559 if isinstance(instruments_d, dict):
560 it = instruments_d.items()
561 else:
562 it = zip(instruments_d, [None] * len(instruments_d))
563
564 inst_l = []
565 task_l = []
566 for inst, spans in it:
567 inst_l.append(inst)
568 task_l.append(
569 delayed(DatasetProvider.inst_calculator)(
570 inst, start_time, end_time, freq, normalize_column_names, spans, C, inst_processors
571 )
572 )
573
574 data = dict(
575 zip(
576 inst_l,
577 ParallelExt(n_jobs=workers, backend=C.joblib_backend, maxtasksperchild=C.maxtasksperchild)(task_l),
578 )
579 )
580
581 new_data = dict()
582 for inst in sorted(data.keys()):
583 if len(data[inst]) > 0:
584 # NOTE: Python version >= 3.6; in versions after python3.6, dict will always guarantee the insertion order
585 new_data[inst] = data[inst]
586
587 if len(new_data) > 0:
588 data = pd.concat(new_data, names=["instrument"], sort=False)
589 data = DiskDatasetCache.cache_to_origin_data(data, column_names)
590 else:
591 data = pd.DataFrame(
592 index=pd.MultiIndex.from_arrays([[], []], names=("instrument", "datetime")),
593 columns=column_names,
594 dtype=np.float32,
595 )
596
597 return data
598
599 @staticmethod
600 def inst_calculator(inst, start_time, end_time, freq, column_names, spans=None, g_config=None, inst_processors=[]):

Callers 2

datasetMethod · 0.80
datasetMethod · 0.80

Calls 4

normalize_cache_fieldsFunction · 0.85
ParallelExtClass · 0.85
get_kernelsMethod · 0.80
cache_to_origin_dataMethod · 0.80

Tested by

no test coverage detected