(self, my_task)
| 98 | self.cancel_remaining = cancel |
| 99 | |
| 100 | def _check_threshold_unstructured(self, my_task): |
| 101 | # This method is extremely poorly named. It is called where there is no split task, but whether or not |
| 102 | # there is a known split is actually irrelevant. The distinction that actually needs to be made is |
| 103 | # "Do we have to look at unfinshed tasks to find out if any of the might pass through this task?" vs |
| 104 | # "Can we make a distinction solely by looking at our own completed inputs?" |
| 105 | |
| 106 | # The default threshold is the number of inputs. |
| 107 | threshold = valueof(my_task, self.threshold) |
| 108 | if threshold is None: |
| 109 | threshold = len(self.inputs) |
| 110 | |
| 111 | # Find all places where this task spec is used and check whether enough inputs have completed to meet the threshold |
| 112 | # Omit building the list of waiting tasks unless they need to be cancelled if the threshold is met |
| 113 | waiting_tasks = [] |
| 114 | completed = 0 |
| 115 | spec_names = [ts.name for ts in self.inputs] |
| 116 | for task in TaskIterator(my_task.workflow.task_tree, end_at_spec=self.name): |
| 117 | if not task.task_spec.name in spec_names: |
| 118 | continue |
| 119 | if task.parent is None or task.has_state(TaskState.COMPLETED): |
| 120 | completed += 1 |
| 121 | elif not task.has_state(TaskState.FINISHED_MASK) and self.cancel_remaining: |
| 122 | waiting_tasks.append(task) |
| 123 | if completed >= threshold: |
| 124 | may_fire = True |
| 125 | if not self.cancel_remaining: |
| 126 | break |
| 127 | else: |
| 128 | may_fire = False |
| 129 | |
| 130 | # If the threshold was reached, get ready to fire. |
| 131 | return may_fire, waiting_tasks |
| 132 | |
| 133 | def _branch_may_merge(self, task): |
| 134 | for child in TaskIterator(task, end_at_spec=self.name): |
no test coverage detected