MCPcopy
hub / github.com/sartography/SpiffWorkflow / _check_threshold_structured

Method _check_threshold_structured

SpiffWorkflow/specs/Join.py:156–181  ·  view source on GitHub ↗
(self, my_task)

Source from the content-addressed store, hash-verified

154 return True
155
156 def _check_threshold_structured(self, my_task):
157 # Retrieve a list of all activated tasks from the associated task that did the conditional parallel split.
158 split_task = my_task.find_ancestor(self.split_task)
159 if split_task is None:
160 raise WorkflowException(f'Split task {self.split_task} which was not reached', task_spec=self)
161 tasks = split_task.task_spec._get_activated_tasks(split_task, my_task)
162
163 # The default threshold is the number of branches that were started.
164 threshold = valueof(my_task, self.threshold)
165 if threshold is None:
166 threshold = len(tasks)
167
168 # Look up which tasks have already completed.
169 waiting_tasks = []
170 completed = 0
171
172 for task in tasks:
173 if self._branch_is_complete(task):
174 completed += 1
175 elif not self._branch_may_merge(task):
176 completed += 1
177 else:
178 waiting_tasks.append(task)
179
180 # If the threshold was reached, get ready to fire.
181 return completed >= threshold, waiting_tasks
182
183 def _update_hook(self, my_task):
184

Callers 1

_update_hookMethod · 0.95

Calls 6

_branch_is_completeMethod · 0.95
_branch_may_mergeMethod · 0.95
WorkflowExceptionClass · 0.85
valueofFunction · 0.85
find_ancestorMethod · 0.80
_get_activated_tasksMethod · 0.45

Tested by

no test coverage detected