MCPcopy
hub / github.com/sartography/SpiffWorkflow / _sync_children

Method _sync_children

SpiffWorkflow/task.py:234–272  ·  view source on GitHub ↗

Syncs the task's children with the given list of task specs. - Add one child for each given `TaskSpec`, unless that child already exists. - Remove all children for which there is no spec in the given list, unless it is a "triggered" task. Notes: It is an err

(self, task_specs, state=TaskState.MAYBE)

Source from the content-addressed store, hash-verified

232 return task
233
234 def _sync_children(self, task_specs, state=TaskState.MAYBE):
235 """Syncs the task's children with the given list of task specs.
236
237 - Add one child for each given `TaskSpec`, unless that child already exists.
238 - Remove all children for which there is no spec in the given list, unless it is a "triggered" task.
239
240 Notes:
241 It is an error if the task has a non-predicted child that is not given in the TaskSpecs.
242
243 Args:
244 task_spec (list(`TaskSpec`)): the list of task specs that may become children
245 state (`TaskState`): the state to assign
246 """
247 if task_specs is None:
248 raise ValueError('"task_specs" argument is None')
249 new_children = task_specs[:]
250
251 # Create a list of all children that are no longer needed.
252 unneeded_children = []
253 for child in self.children:
254 if child.triggered:
255 # Triggered tasks are never removed.
256 pass
257 elif child.task_spec in new_children:
258 # If the task already exists, remove it from to-be-added and update its state
259 new_children.remove(child.task_spec)
260 if child.has_state(TaskState.NOT_FINISHED_MASK):
261 child._set_state(state)
262 else:
263 if child.has_state(TaskState.DEFINITE_MASK):
264 # Definite tasks must not be removed, so they HAVE to be in the given task spec list.
265 raise WorkflowException(f'removal of non-predicted child {child}', task_spec=self.task_spec)
266 unneeded_children.append(child)
267
268 # Update children accordingly
269 for child in unneeded_children:
270 self.workflow._remove_task(child.id)
271 for task_spec in new_children:
272 self._add_child(task_spec, state)
273
274 def _child_added_notify(self, child):
275 """Called by another task to let us know that a child was added."""

Callers 14

_predict_hookMethod · 0.80
_run_hookMethod · 0.80
_predict_hookMethod · 0.80
_predict_hookMethod · 0.80
_run_hookMethod · 0.80
_run_hookMethod · 0.80
_run_hookMethod · 0.80
_predict_hookMethod · 0.80
_run_hookMethod · 0.80
_predict_hookMethod · 0.80
_predict_hookMethod · 0.80
_create_subworkflowMethod · 0.80

Calls 5

_add_childMethod · 0.95
WorkflowExceptionClass · 0.85
has_stateMethod · 0.80
_set_stateMethod · 0.80
_remove_taskMethod · 0.80

Tested by

no test coverage detected