Main process (producer) produces data and stores them in a queue. Sub-processes (consumers) can retrieve the data-points from the queue. Data-points are generated via reading items from ``dataset``. :class:`DataQueue` is ephemeral. You must create a new DataQueue when the ``repeat``
| 22 | |
| 23 | |
| 24 | class DataQueue(Generic[T]): |
| 25 | """Main process (producer) produces data and stores them in a queue. |
| 26 | Sub-processes (consumers) can retrieve the data-points from the queue. |
| 27 | Data-points are generated via reading items from ``dataset``. |
| 28 | |
| 29 | :class:`DataQueue` is ephemeral. You must create a new DataQueue |
| 30 | when the ``repeat`` is exhausted. |
| 31 | |
| 32 | See the documents of :class:`qlib.rl.utils.FiniteVectorEnv` for more background. |
| 33 | |
| 34 | Parameters |
| 35 | ---------- |
| 36 | dataset |
| 37 | The dataset to read data from. Must implement ``__len__`` and ``__getitem__``. |
| 38 | repeat |
| 39 | Iterate over the data-points for how many times. Use ``-1`` to iterate forever. |
| 40 | shuffle |
| 41 | If ``shuffle`` is true, the items will be read in random order. |
| 42 | producer_num_workers |
| 43 | Concurrent workers for data-loading. |
| 44 | queue_maxsize |
| 45 | Maximum items to put into queue before it jams. |
| 46 | |
| 47 | Examples |
| 48 | -------- |
| 49 | >>> data_queue = DataQueue(my_dataset) |
| 50 | >>> with data_queue: |
| 51 | ... ... |
| 52 | |
| 53 | In worker: |
| 54 | |
| 55 | >>> for data in data_queue: |
| 56 | ... print(data) |
| 57 | """ |
| 58 | |
| 59 | def __init__( |
| 60 | self, |
| 61 | dataset: Sequence[T], |
| 62 | repeat: int = 1, |
| 63 | shuffle: bool = True, |
| 64 | producer_num_workers: int = 0, |
| 65 | queue_maxsize: int = 0, |
| 66 | ) -> None: |
| 67 | if queue_maxsize == 0: |
| 68 | if os.cpu_count() is not None: |
| 69 | queue_maxsize = cast(int, os.cpu_count()) |
| 70 | _logger.info(f"Automatically set data queue maxsize to {queue_maxsize} to avoid overwhelming.") |
| 71 | else: |
| 72 | queue_maxsize = 1 |
| 73 | _logger.warning(f"CPU count not available. Setting queue maxsize to 1.") |
| 74 | |
| 75 | self.dataset: Sequence[T] = dataset |
| 76 | self.repeat: int = repeat |
| 77 | self.shuffle: bool = shuffle |
| 78 | self.producer_num_workers: int = producer_num_workers |
| 79 | |
| 80 | self._activated: bool = False |
| 81 | self._queue: multiprocessing.Queue = multiprocessing.Queue(maxsize=queue_maxsize) |
no outgoing calls