Create a new workflow task
(self)
| 61 | return self._plugin_integration |
| 62 | |
| 63 | def create_task(self) -> WorkflowTask: |
| 64 | """Create a new workflow task""" |
| 65 | task_id = str(uuid.uuid4()) |
| 66 | task = WorkflowTask(task_id=task_id) |
| 67 | self._tasks[task_id] = task |
| 68 | self._subscribers[task_id] = [] |
| 69 | return task |
| 70 | |
| 71 | def get_task(self, task_id: str) -> Optional[WorkflowTask]: |
| 72 | """Get task by ID""" |
no test coverage detected