MCPcopy Index your code
hub / github.com/dmlc/dgl / DataLoader

Class DataLoader

python/dgl/graphbolt/dataloader.py:78–191  ·  view source on GitHub ↗

Multiprocessing DataLoader. Iterates over the data pipeline with everything before feature fetching (i.e. :class:`dgl.graphbolt.FeatureFetcher`) in subprocesses, and everything after feature fetching in the main process. The datapipe is modified in-place as a result. When the c

Source from the content-addressed store, hash-verified

76
77
78class DataLoader(MiniBatchTransformer):
79 """Multiprocessing DataLoader.
80
81 Iterates over the data pipeline with everything before feature fetching
82 (i.e. :class:`dgl.graphbolt.FeatureFetcher`) in subprocesses, and
83 everything after feature fetching in the main process. The datapipe
84 is modified in-place as a result.
85
86 When the copy_to operation is placed earlier in the data pipeline, the
87 num_workers argument is required to be 0 as utilizing CUDA in multiple
88 worker processes is not supported.
89
90 Parameters
91 ----------
92 datapipe : DataPipe
93 The data pipeline.
94 num_workers : int, optional
95 Number of worker processes. Default is 0.
96 persistent_workers : bool, optional
97 If True, the data loader will not shut down the worker processes after a
98 dataset has been consumed once. This allows to maintain the workers
99 instances alive.
100 max_uva_threads : int, optional
101 Limits the number of CUDA threads used for UVA copies so that the rest
102 of the computations can run simultaneously with it. Setting it to a too
103 high value will limit the amount of overlap while setting it too low may
104 cause the PCI-e bandwidth to not get fully utilized. Manually tuned
105 default is 10240, meaning around 5-7 Streaming Multiprocessors.
106 """
107
108 def __init__(
109 self,
110 datapipe,
111 num_workers=0,
112 persistent_workers=True,
113 max_uva_threads=10240,
114 ):
115 # Multiprocessing requires two modifications to the datapipe:
116 #
117 # 1. Insert a stage after ItemSampler to distribute the
118 # minibatches evenly across processes.
119 # 2. Cut the datapipe at FeatureFetcher, and wrap the inner datapipe
120 # of the FeatureFetcher with a multiprocessing PyTorch DataLoader.
121
122 datapipe = datapipe.mark_end()
123 datapipe_graph = traverse_dps(datapipe)
124
125 if num_workers > 0:
126 # (1) Insert minibatch distribution.
127 # TODO(BarclayII): Currently I'm using sharding_filter() as a
128 # concept demonstration. Later on minibatch distribution should be
129 # merged into ItemSampler to maximize efficiency.
130 item_samplers = find_dps(
131 datapipe_graph,
132 ItemSampler,
133 )
134 for item_sampler in item_samplers:
135 datapipe_graph = replace_dp(

Callers 15

test_DeepWalkFunction · 0.90
__init__Method · 0.90
train.pyFile · 0.90
mainFunction · 0.90
load_dataFunction · 0.90
_get_dataloaderFunction · 0.90
evaluateFunction · 0.90
mainFunction · 0.90
sudoku_dataloaderFunction · 0.90
trainFunction · 0.90
testFunction · 0.90
train.pyFile · 0.90

Calls

no outgoing calls

Tested by 2

test_DeepWalkFunction · 0.72
mainFunction · 0.72