Execute any READY tasks that are engine specific (for example, gateways or script tasks). This is done in a loop, so it will keep completing those tasks until there are only READY User tasks, or WAITING tasks left. :param will_complete_task: Callback that wi
(self, will_complete_task=None, did_complete_task=None)
| 155 | return [t.task_spec.event_definition.details(t) for t in iter] |
| 156 | |
| 157 | def do_engine_steps(self, will_complete_task=None, did_complete_task=None): |
| 158 | """ |
| 159 | Execute any READY tasks that are engine specific (for example, gateways |
| 160 | or script tasks). This is done in a loop, so it will keep completing |
| 161 | those tasks until there are only READY User tasks, or WAITING tasks |
| 162 | left. |
| 163 | |
| 164 | :param will_complete_task: Callback that will be called prior to completing a task |
| 165 | :param did_complete_task: Callback that will be called after completing a task |
| 166 | """ |
| 167 | def update_workflow(wf): |
| 168 | count = 0 |
| 169 | # Wanted to use the iterator method here, but at least this is a shorter list |
| 170 | for task in wf.get_tasks(state=TaskState.READY): |
| 171 | if not task.task_spec.manual: |
| 172 | if will_complete_task is not None: |
| 173 | will_complete_task(task) |
| 174 | task.run() |
| 175 | count += 1 |
| 176 | if did_complete_task is not None: |
| 177 | did_complete_task(task) |
| 178 | return count |
| 179 | |
| 180 | active_subprocesses = self.get_active_subprocesses() |
| 181 | for subprocess in sorted(active_subprocesses, key=lambda v: v.depth, reverse=True): |
| 182 | count = None |
| 183 | while count is None or count > 0: |
| 184 | count = update_workflow(subprocess) |
| 185 | if subprocess.parent_task_id is not None: |
| 186 | task = self.get_task_from_id(subprocess.parent_task_id) |
| 187 | task.task_spec._update(task) |
| 188 | |
| 189 | count = update_workflow(self) |
| 190 | if count > 0 or len(self.get_active_subprocesses()) > len(active_subprocesses): |
| 191 | self.do_engine_steps(will_complete_task, did_complete_task) |
| 192 | |
| 193 | def refresh_waiting_tasks(self, will_refresh_task=None, did_refresh_task=None): |
| 194 | """ |