MCPcopy
hub / github.com/microsoft/qlib / DataQueue

Class DataQueue

qlib/rl/utils/data_queue.py:24–188  ·  view source on GitHub ↗

Main process (producer) produces data and stores them in a queue. Sub-processes (consumers) can retrieve the data-points from the queue. Data-points are generated via reading items from ``dataset``. :class:`DataQueue` is ephemeral. You must create a new DataQueue when the ``repeat``

Source from the content-addressed store, hash-verified

22
23
24class DataQueue(Generic[T]):
25 """Main process (producer) produces data and stores them in a queue.
26 Sub-processes (consumers) can retrieve the data-points from the queue.
27 Data-points are generated via reading items from ``dataset``.
28
29 :class:`DataQueue` is ephemeral. You must create a new DataQueue
30 when the ``repeat`` is exhausted.
31
32 See the documents of :class:`qlib.rl.utils.FiniteVectorEnv` for more background.
33
34 Parameters
35 ----------
36 dataset
37 The dataset to read data from. Must implement ``__len__`` and ``__getitem__``.
38 repeat
39 Iterate over the data-points for how many times. Use ``-1`` to iterate forever.
40 shuffle
41 If ``shuffle`` is true, the items will be read in random order.
42 producer_num_workers
43 Concurrent workers for data-loading.
44 queue_maxsize
45 Maximum items to put into queue before it jams.
46
47 Examples
48 --------
49 >>> data_queue = DataQueue(my_dataset)
50 >>> with data_queue:
51 ... ...
52
53 In worker:
54
55 >>> for data in data_queue:
56 ... print(data)
57 """
58
59 def __init__(
60 self,
61 dataset: Sequence[T],
62 repeat: int = 1,
63 shuffle: bool = True,
64 producer_num_workers: int = 0,
65 queue_maxsize: int = 0,
66 ) -> None:
67 if queue_maxsize == 0:
68 if os.cpu_count() is not None:
69 queue_maxsize = cast(int, os.cpu_count())
70 _logger.info(f"Automatically set data queue maxsize to {queue_maxsize} to avoid overwhelming.")
71 else:
72 queue_maxsize = 1
73 _logger.warning(f"CPU count not available. Setting queue maxsize to 1.")
74
75 self.dataset: Sequence[T] = dataset
76 self.repeat: int = repeat
77 self.shuffle: bool = shuffle
78 self.producer_num_workers: int = producer_num_workers
79
80 self._activated: bool = False
81 self._queue: multiprocessing.Queue = multiprocessing.Queue(maxsize=queue_maxsize)

Callers 7

_exit_finiteFunction · 0.90
_exit_infiniteFunction · 0.90
train_seed_iteratorMethod · 0.90
val_seed_iteratorMethod · 0.90
test_seed_iteratorMethod · 0.90

Calls

no outgoing calls

Tested by 5

_exit_finiteFunction · 0.72
_exit_infiniteFunction · 0.72
test_seed_iteratorMethod · 0.72