Copy a value from the task data to the workflow data
(self, my_task)
| 76 | logger.info(f'Read workflow variable', extra=my_task.collect_log_extras({'bpmn_id': self.bpmn_id})) |
| 77 | |
| 78 | def set(self, my_task): |
| 79 | """Copy a value from the task data to the workflow data""" |
| 80 | |
| 81 | if self.bpmn_id not in my_task.data: |
| 82 | message = f"A data object could not be set; '{self.bpmn_id}' not exist in the task." |
| 83 | raise WorkflowDataException(message, my_task, data_output=self) |
| 84 | |
| 85 | wf = my_task.workflow |
| 86 | while wf is not None and self.bpmn_id not in wf.spec.data_objects: |
| 87 | wf = wf.parent_workflow |
| 88 | |
| 89 | wf.data_objects[self.bpmn_id] = deepcopy(my_task.data[self.bpmn_id]) |
| 90 | del my_task.data[self.bpmn_id] |
| 91 | logger.info(f'Set workflow variable', extra=my_task.collect_log_extras({'bpmn_id': self.bpmn_id})) |
| 92 | |
| 93 | def delete(self, my_task): |
| 94 | my_task.data.pop(self.bpmn_id, None) |
no test coverage detected