(self, my_task)
| 154 | return True |
| 155 | |
| 156 | def _check_threshold_structured(self, my_task): |
| 157 | # Retrieve a list of all activated tasks from the associated task that did the conditional parallel split. |
| 158 | split_task = my_task.find_ancestor(self.split_task) |
| 159 | if split_task is None: |
| 160 | raise WorkflowException(f'Split task {self.split_task} which was not reached', task_spec=self) |
| 161 | tasks = split_task.task_spec._get_activated_tasks(split_task, my_task) |
| 162 | |
| 163 | # The default threshold is the number of branches that were started. |
| 164 | threshold = valueof(my_task, self.threshold) |
| 165 | if threshold is None: |
| 166 | threshold = len(tasks) |
| 167 | |
| 168 | # Look up which tasks have already completed. |
| 169 | waiting_tasks = [] |
| 170 | completed = 0 |
| 171 | |
| 172 | for task in tasks: |
| 173 | if self._branch_is_complete(task): |
| 174 | completed += 1 |
| 175 | elif not self._branch_may_merge(task): |
| 176 | completed += 1 |
| 177 | else: |
| 178 | waiting_tasks.append(task) |
| 179 | |
| 180 | # If the threshold was reached, get ready to fire. |
| 181 | return completed >= threshold, waiting_tasks |
| 182 | |
| 183 | def _update_hook(self, my_task): |
| 184 |
no test coverage detected