(
task_ex_id, ac_ex_status, ac_ex_result=None, ac_ex_ctx=None, publish=True
)
| 1013 | wait_jitter_max=cfg.CONF.workflow_engine.retry_max_jitter_msec, |
| 1014 | ) |
| 1015 | def update_task_state( |
| 1016 | task_ex_id, ac_ex_status, ac_ex_result=None, ac_ex_ctx=None, publish=True |
| 1017 | ): |
| 1018 | # Return if action execution status is not in the list of statuses to process. |
| 1019 | statuses_to_process = copy.copy(ac_const.LIVEACTION_COMPLETED_STATES) + [ |
| 1020 | ac_const.LIVEACTION_STATUS_PAUSED, |
| 1021 | ac_const.LIVEACTION_STATUS_PENDING, |
| 1022 | ] |
| 1023 | |
| 1024 | if ac_ex_status not in statuses_to_process: |
| 1025 | return |
| 1026 | |
| 1027 | # Refresh records |
| 1028 | task_ex_db = wf_db_access.TaskExecution.get_by_id(task_ex_id) |
| 1029 | conductor, wf_ex_db = refresh_conductor(task_ex_db.workflow_execution) |
| 1030 | |
| 1031 | # Update task flow if task execution is completed or paused. |
| 1032 | msg = 'Publish task "%s", route "%s", with status "%s" to conductor.' |
| 1033 | msg = msg % (task_ex_db.task_id, str(task_ex_db.task_route), task_ex_db.status) |
| 1034 | update_progress(wf_ex_db, msg, stream=False) |
| 1035 | |
| 1036 | if not ac_ex_ctx or "item_id" not in ac_ex_ctx or ac_ex_ctx["item_id"] < 0: |
| 1037 | ac_ex_event = events.ActionExecutionEvent(ac_ex_status, result=ac_ex_result) |
| 1038 | else: |
| 1039 | accumulated_result = [ |
| 1040 | item.get("result") if item else None for item in task_ex_db.result["items"] |
| 1041 | ] |
| 1042 | |
| 1043 | ac_ex_event = events.TaskItemActionExecutionEvent( |
| 1044 | ac_ex_ctx["item_id"], |
| 1045 | ac_ex_status, |
| 1046 | result=ac_ex_result, |
| 1047 | accumulated_result=accumulated_result, |
| 1048 | ) |
| 1049 | |
| 1050 | update_progress(wf_ex_db, conductor.serialize(), severity="debug", stream=False) |
| 1051 | conductor.update_task_state(task_ex_db.task_id, task_ex_db.task_route, ac_ex_event) |
| 1052 | |
| 1053 | # Update workflow execution and related liveaction and action execution. |
| 1054 | update_execution_records( |
| 1055 | wf_ex_db, |
| 1056 | conductor, |
| 1057 | update_lv_ac_on_statuses=statuses_to_process, |
| 1058 | pub_lv_ac=publish, |
| 1059 | pub_ac_ex=publish, |
| 1060 | ) |
| 1061 | |
| 1062 | |
| 1063 | @retrying.retry( |
no test coverage detected