MCPcopy
hub / github.com/dmlc/dgl / __init__

Method __init__

python/dgl/distributed/dist_context.py:117–145  ·  view source on GitHub ↗

Customized worker pool init function

(self, num_workers, rpc_config)

Source from the content-addressed store, hash-verified

115 """Customized worker pool"""
116
117 def __init__(self, num_workers, rpc_config):
118 """
119 Customized worker pool init function
120 """
121 ctx = mp.get_context("spawn")
122 self.num_workers = num_workers
123 # As pool could be used by any number of dataloaders, queues
124 # should be able to take infinite elements to avoid dead lock.
125 self.queue_size = 0
126 self.result_queue = ctx.Queue(self.queue_size)
127 self.results = {} # key is dataloader name, value is fetched batch.
128 self.task_queues = []
129 self.process_list = []
130 self.current_proc_id = 0
131 self.cache_result_dict = {}
132 self.barrier = ctx.Barrier(num_workers)
133 for _ in range(num_workers):
134 task_queue = ctx.Queue(self.queue_size)
135 self.task_queues.append(task_queue)
136 proc = ctx.Process(
137 target=init_process,
138 args=(
139 rpc_config,
140 (self.result_queue, task_queue, self.barrier),
141 ),
142 )
143 proc.daemon = True
144 proc.start()
145 self.process_list.append(proc)
146
147 def set_collate_fn(self, func, dataloader_name):
148 """Set collate function in subprocess"""

Callers

nothing calls this directly

Calls 2

appendMethod · 0.80
startMethod · 0.80

Tested by

no test coverage detected