MCPcopy Index your code
hub / github.com/pytorch/pytorch / TaskGroup

Class TaskGroup

caffe2/python/task.py:158–351  ·  view source on GitHub ↗

Context that gathers tasks which will run concurrently, potentially on multiple nodes. All tasks in the same node will share the same workspace and thus can share blobs, while tasks running in different nodes won't be able to directly share data. All tasks of the task group wil

Source from the content-addressed store, hash-verified

156
157
158class TaskGroup(context.Managed):
159 """
160 Context that gathers tasks which will run concurrently, potentially on
161 multiple nodes. All tasks in the same node will share the same workspace
162 and thus can share blobs, while tasks running in different nodes won't
163 be able to directly share data.
164
165 All tasks of the task group will start concurrently, and the task group
166 will finish execution when the last task of the group finishes.
167
168 Example:
169 # suppose that s1 ... s5 are execution steps or nets.
170 with TaskGroup() as tg:
171 # these tasks go to default node 'local'
172 Task(step=s1)
173 Task(step=s2)
174
175 with Node('n2'):
176 Task(step=s3)
177 with Node('n1'):
178 Task(step=s4)
179 with Node('n2'):
180 Task(step=s5)
181
182 # this will run all steps in parallel.
183 # s1 and s2 will run at default node 'local'
184 # s3 and s5 will run at node 'n2'
185 # s4 will run at node 'n1'
186 session.run(tg)
187 """
188 LOCAL_SETUP = 'local_setup'
189
190 def __init__(self, workspace_type=None):
191 self._plan_cache = None
192 self._tasks = []
193 self._already_used = False
194 self._prev_active = None
195 self._tasks_to_add = []
196 self._report_nets = {}
197 self._report_steps = []
198 self._workspace_type = workspace_type
199 self._tasks_by_node = None
200 self._remote_nets = []
201
202 def add_remote_net(self, net):
203 self._remote_nets.append(net)
204
205 def remote_nets(self):
206 return self._remote_nets
207
208 def add(self, task):
209 assert not self._already_used, (
210 'Cannot add Task to an already used TaskGroup.')
211 assert (
212 self._workspace_type is None or
213 task._workspace_type is None or
214 self._workspace_type == task._workspace_type)
215 if task._workspace_type is None:

Callers 15

buildMethod · 0.90
test_multi_instanceMethod · 0.90
test_dequeue_manyMethod · 0.90
__init__Method · 0.90
_task_groupMethod · 0.90
initMethod · 0.90
build_cache_stepMethod · 0.90
compileMethod · 0.90
test_local_sessionMethod · 0.90
test_composite_readerMethod · 0.90
test_runtime_threadsMethod · 0.90

Calls

no outgoing calls

Tested by 9

buildMethod · 0.72
test_multi_instanceMethod · 0.72
test_dequeue_manyMethod · 0.72
test_local_sessionMethod · 0.72
test_composite_readerMethod · 0.72
test_runtime_threadsMethod · 0.72
_read_all_dataMethod · 0.72

Used in the wild real call sites across dependent graphs

searching dependent graphs…