(self, my_task)
| 52 | Join.__init__(self, wf_spec, name, split_task, **kwargs) |
| 53 | |
| 54 | def _start(self, my_task): |
| 55 | # If the threshold was already reached, there is nothing else to do. |
| 56 | if my_task.has_state(TaskState.COMPLETED): |
| 57 | return False |
| 58 | if my_task.has_state(TaskState.READY): |
| 59 | return True |
| 60 | |
| 61 | # Retrieve a list of all activated tasks from the associated task that did the conditional parallel split. |
| 62 | split_task = my_task.find_ancestor(self.split_task) |
| 63 | if split_task is None: |
| 64 | raise WorkflowException(f'Join with %s, which was not reached {self.split_task}', task_spec=self) |
| 65 | tasks = split_task.task_spec._get_activated_threads(split_task) |
| 66 | |
| 67 | # The default threshold is the number of threads that were started. |
| 68 | threshold = valueof(my_task, self.threshold) |
| 69 | if threshold is None: |
| 70 | threshold = len(tasks) |
| 71 | |
| 72 | # Look up which tasks have already completed. |
| 73 | waiting_tasks = [] |
| 74 | completed = 0 |
| 75 | for task in tasks: |
| 76 | # Refresh path prediction. |
| 77 | task.task_spec._predict(task) |
| 78 | |
| 79 | if self._branch_is_complete(task): |
| 80 | completed += 1 |
| 81 | else: |
| 82 | waiting_tasks.append(task) |
| 83 | |
| 84 | # If the threshold was reached, get ready to fire. |
| 85 | if completed >= threshold: |
| 86 | # If this is a cancelling join, cancel all incoming branches, |
| 87 | # except for the one that just completed. |
| 88 | if self.cancel_remaining: |
| 89 | for task in waiting_tasks: |
| 90 | task.cancel() |
| 91 | return True |
| 92 | |
| 93 | # We do NOT set the task state to COMPLETED, because in |
| 94 | # case all other incoming tasks get cancelled (or never reach |
| 95 | # the ThreadMerge for other reasons, such as reaching a stub branch), |
| 96 | # we need to revisit it. |
| 97 | return False |
| 98 | |
| 99 | def _update_hook(self, my_task): |
| 100 |
no test coverage detected