Syncs the task's children with the given list of task specs. - Add one child for each given `TaskSpec`, unless that child already exists. - Remove all children for which there is no spec in the given list, unless it is a "triggered" task. Notes: It is an err
(self, task_specs, state=TaskState.MAYBE)
| 232 | return task |
| 233 | |
| 234 | def _sync_children(self, task_specs, state=TaskState.MAYBE): |
| 235 | """Syncs the task's children with the given list of task specs. |
| 236 | |
| 237 | - Add one child for each given `TaskSpec`, unless that child already exists. |
| 238 | - Remove all children for which there is no spec in the given list, unless it is a "triggered" task. |
| 239 | |
| 240 | Notes: |
| 241 | It is an error if the task has a non-predicted child that is not given in the TaskSpecs. |
| 242 | |
| 243 | Args: |
| 244 | task_spec (list(`TaskSpec`)): the list of task specs that may become children |
| 245 | state (`TaskState`): the state to assign |
| 246 | """ |
| 247 | if task_specs is None: |
| 248 | raise ValueError('"task_specs" argument is None') |
| 249 | new_children = task_specs[:] |
| 250 | |
| 251 | # Create a list of all children that are no longer needed. |
| 252 | unneeded_children = [] |
| 253 | for child in self.children: |
| 254 | if child.triggered: |
| 255 | # Triggered tasks are never removed. |
| 256 | pass |
| 257 | elif child.task_spec in new_children: |
| 258 | # If the task already exists, remove it from to-be-added and update its state |
| 259 | new_children.remove(child.task_spec) |
| 260 | if child.has_state(TaskState.NOT_FINISHED_MASK): |
| 261 | child._set_state(state) |
| 262 | else: |
| 263 | if child.has_state(TaskState.DEFINITE_MASK): |
| 264 | # Definite tasks must not be removed, so they HAVE to be in the given task spec list. |
| 265 | raise WorkflowException(f'removal of non-predicted child {child}', task_spec=self.task_spec) |
| 266 | unneeded_children.append(child) |
| 267 | |
| 268 | # Update children accordingly |
| 269 | for child in unneeded_children: |
| 270 | self.workflow._remove_task(child.id) |
| 271 | for task_spec in new_children: |
| 272 | self._add_child(task_spec, state) |
| 273 | |
| 274 | def _child_added_notify(self, child): |
| 275 | """Called by another task to let us know that a child was added.""" |
no test coverage detected