Multiprocessing DataLoader. Iterates over the data pipeline with everything before feature fetching (i.e. :class:`dgl.graphbolt.FeatureFetcher`) in subprocesses, and everything after feature fetching in the main process. The datapipe is modified in-place as a result. When the c
| 76 | |
| 77 | |
| 78 | class DataLoader(MiniBatchTransformer): |
| 79 | """Multiprocessing DataLoader. |
| 80 | |
| 81 | Iterates over the data pipeline with everything before feature fetching |
| 82 | (i.e. :class:`dgl.graphbolt.FeatureFetcher`) in subprocesses, and |
| 83 | everything after feature fetching in the main process. The datapipe |
| 84 | is modified in-place as a result. |
| 85 | |
| 86 | When the copy_to operation is placed earlier in the data pipeline, the |
| 87 | num_workers argument is required to be 0 as utilizing CUDA in multiple |
| 88 | worker processes is not supported. |
| 89 | |
| 90 | Parameters |
| 91 | ---------- |
| 92 | datapipe : DataPipe |
| 93 | The data pipeline. |
| 94 | num_workers : int, optional |
| 95 | Number of worker processes. Default is 0. |
| 96 | persistent_workers : bool, optional |
| 97 | If True, the data loader will not shut down the worker processes after a |
| 98 | dataset has been consumed once. This allows to maintain the workers |
| 99 | instances alive. |
| 100 | max_uva_threads : int, optional |
| 101 | Limits the number of CUDA threads used for UVA copies so that the rest |
| 102 | of the computations can run simultaneously with it. Setting it to a too |
| 103 | high value will limit the amount of overlap while setting it too low may |
| 104 | cause the PCI-e bandwidth to not get fully utilized. Manually tuned |
| 105 | default is 10240, meaning around 5-7 Streaming Multiprocessors. |
| 106 | """ |
| 107 | |
| 108 | def __init__( |
| 109 | self, |
| 110 | datapipe, |
| 111 | num_workers=0, |
| 112 | persistent_workers=True, |
| 113 | max_uva_threads=10240, |
| 114 | ): |
| 115 | # Multiprocessing requires two modifications to the datapipe: |
| 116 | # |
| 117 | # 1. Insert a stage after ItemSampler to distribute the |
| 118 | # minibatches evenly across processes. |
| 119 | # 2. Cut the datapipe at FeatureFetcher, and wrap the inner datapipe |
| 120 | # of the FeatureFetcher with a multiprocessing PyTorch DataLoader. |
| 121 | |
| 122 | datapipe = datapipe.mark_end() |
| 123 | datapipe_graph = traverse_dps(datapipe) |
| 124 | |
| 125 | if num_workers > 0: |
| 126 | # (1) Insert minibatch distribution. |
| 127 | # TODO(BarclayII): Currently I'm using sharding_filter() as a |
| 128 | # concept demonstration. Later on minibatch distribution should be |
| 129 | # merged into ItemSampler to maximize efficiency. |
| 130 | item_samplers = find_dps( |
| 131 | datapipe_graph, |
| 132 | ItemSampler, |
| 133 | ) |
| 134 | for item_sampler in item_samplers: |
| 135 | datapipe_graph = replace_dp( |
no outgoing calls