Context that gathers tasks which will run concurrently, potentially on multiple nodes. All tasks in the same node will share the same workspace and thus can share blobs, while tasks running in different nodes won't be able to directly share data. All tasks of the task group wil
| 156 | |
| 157 | |
| 158 | class TaskGroup(context.Managed): |
| 159 | """ |
| 160 | Context that gathers tasks which will run concurrently, potentially on |
| 161 | multiple nodes. All tasks in the same node will share the same workspace |
| 162 | and thus can share blobs, while tasks running in different nodes won't |
| 163 | be able to directly share data. |
| 164 | |
| 165 | All tasks of the task group will start concurrently, and the task group |
| 166 | will finish execution when the last task of the group finishes. |
| 167 | |
| 168 | Example: |
| 169 | # suppose that s1 ... s5 are execution steps or nets. |
| 170 | with TaskGroup() as tg: |
| 171 | # these tasks go to default node 'local' |
| 172 | Task(step=s1) |
| 173 | Task(step=s2) |
| 174 | |
| 175 | with Node('n2'): |
| 176 | Task(step=s3) |
| 177 | with Node('n1'): |
| 178 | Task(step=s4) |
| 179 | with Node('n2'): |
| 180 | Task(step=s5) |
| 181 | |
| 182 | # this will run all steps in parallel. |
| 183 | # s1 and s2 will run at default node 'local' |
| 184 | # s3 and s5 will run at node 'n2' |
| 185 | # s4 will run at node 'n1' |
| 186 | session.run(tg) |
| 187 | """ |
| 188 | LOCAL_SETUP = 'local_setup' |
| 189 | |
| 190 | def __init__(self, workspace_type=None): |
| 191 | self._plan_cache = None |
| 192 | self._tasks = [] |
| 193 | self._already_used = False |
| 194 | self._prev_active = None |
| 195 | self._tasks_to_add = [] |
| 196 | self._report_nets = {} |
| 197 | self._report_steps = [] |
| 198 | self._workspace_type = workspace_type |
| 199 | self._tasks_by_node = None |
| 200 | self._remote_nets = [] |
| 201 | |
| 202 | def add_remote_net(self, net): |
| 203 | self._remote_nets.append(net) |
| 204 | |
| 205 | def remote_nets(self): |
| 206 | return self._remote_nets |
| 207 | |
| 208 | def add(self, task): |
| 209 | assert not self._already_used, ( |
| 210 | 'Cannot add Task to an already used TaskGroup.') |
| 211 | assert ( |
| 212 | self._workspace_type is None or |
| 213 | task._workspace_type is None or |
| 214 | self._workspace_type == task._workspace_type) |
| 215 | if task._workspace_type is None: |
no outgoing calls
searching dependent graphs…