Copy a value form the workflow data to the task data.
(self, my_task)
| 61 | """Copy data between process variables and tasks""" |
| 62 | |
| 63 | def get(self, my_task): |
| 64 | """Copy a value form the workflow data to the task data.""" |
| 65 | |
| 66 | # Find the spec where the data object is defined and put it there |
| 67 | wf = my_task.workflow |
| 68 | while wf is not None and self.bpmn_id not in wf.spec.data_objects: |
| 69 | wf = wf.parent_workflow |
| 70 | |
| 71 | if wf is None or self.bpmn_id not in wf.data_objects: |
| 72 | message = f"The data object could not be read; '{self.bpmn_id}' does not exist in the process." |
| 73 | raise WorkflowDataException(message, my_task, data_input=self) |
| 74 | |
| 75 | my_task.data[self.bpmn_id] = deepcopy(wf.data_objects[self.bpmn_id]) |
| 76 | logger.info(f'Read workflow variable', extra=my_task.collect_log_extras({'bpmn_id': self.bpmn_id})) |
| 77 | |
| 78 | def set(self, my_task): |
| 79 | """Copy a value from the task data to the workflow data""" |
no test coverage detected