(self, my_task)
| 94 | return self.subprocesses.get(my_task.id) |
| 95 | |
| 96 | def delete_subprocess(self, my_task): |
| 97 | subprocess = self.subprocesses.get(my_task.id) |
| 98 | tasks = subprocess.get_tasks() |
| 99 | for sp in [c for c in self.subprocesses.values() if c.parent_workflow == subprocess]: |
| 100 | tasks.extend(self.delete_subprocess(self.get_task_from_id(sp.parent_task_id))) |
| 101 | del self.subprocesses[my_task.id] |
| 102 | return tasks |
| 103 | |
| 104 | def get_active_subprocesses(self): |
| 105 | return [sp for sp in self.subprocesses.values() if not sp.completed] |
no test coverage detected