(task_ex_id, ac_ex_status, ac_ex_result=None, ac_ex_ctx=None)
| 1219 | wait_jitter_max=cfg.CONF.workflow_engine.retry_max_jitter_msec, |
| 1220 | ) |
| 1221 | def update_task_execution(task_ex_id, ac_ex_status, ac_ex_result=None, ac_ex_ctx=None): |
| 1222 | if ac_ex_status not in statuses.COMPLETED_STATUSES + [ |
| 1223 | statuses.PAUSED, |
| 1224 | statuses.PENDING, |
| 1225 | ]: |
| 1226 | return |
| 1227 | |
| 1228 | task_ex_db = wf_db_access.TaskExecution.get_by_id(task_ex_id) |
| 1229 | wf_ex_db = wf_db_access.WorkflowExecution.get_by_id(task_ex_db.workflow_execution) |
| 1230 | |
| 1231 | # Treat the update of task with items but items list is empty like a normal task execution. |
| 1232 | if not task_ex_db.itemized or (task_ex_db.itemized and task_ex_db.items_count == 0): |
| 1233 | if ac_ex_status != task_ex_db.status: |
| 1234 | msg = 'Updating task execution "%s" for task "%s" from status "%s" to "%s".' |
| 1235 | msg = msg % ( |
| 1236 | task_ex_id, |
| 1237 | task_ex_db.task_id, |
| 1238 | task_ex_db.status, |
| 1239 | ac_ex_status, |
| 1240 | ) |
| 1241 | update_progress(wf_ex_db, msg) |
| 1242 | |
| 1243 | task_ex_db.status = ac_ex_status |
| 1244 | task_ex_db.result = ac_ex_result if ac_ex_result else task_ex_db.result |
| 1245 | elif task_ex_db.itemized and ac_ex_ctx: |
| 1246 | if "orquesta" not in ac_ex_ctx or "item_id" not in ac_ex_ctx["orquesta"]: |
| 1247 | msg = "Context information for the item is not provided. %s" % str( |
| 1248 | ac_ex_ctx |
| 1249 | ) |
| 1250 | update_progress(wf_ex_db, msg, severity="error", log=False) |
| 1251 | raise Exception(msg) |
| 1252 | |
| 1253 | item_id = ac_ex_ctx["orquesta"]["item_id"] |
| 1254 | |
| 1255 | msg = 'Processing action execution for task "%s", route "%s", item "%s".' |
| 1256 | msg = msg % (task_ex_db.task_id, str(task_ex_db.task_route), item_id) |
| 1257 | update_progress(wf_ex_db, msg, severity="debug") |
| 1258 | |
| 1259 | task_ex_db.result["items"][item_id] = { |
| 1260 | "status": ac_ex_status, |
| 1261 | "result": ac_ex_result, |
| 1262 | } |
| 1263 | |
| 1264 | item_statuses = [ |
| 1265 | item.get("status", statuses.UNSET) if item else statuses.UNSET |
| 1266 | for item in task_ex_db.result["items"] |
| 1267 | ] |
| 1268 | |
| 1269 | task_completed = all( |
| 1270 | [status in statuses.COMPLETED_STATUSES for status in item_statuses] |
| 1271 | ) |
| 1272 | |
| 1273 | if task_completed: |
| 1274 | new_task_status = ( |
| 1275 | statuses.SUCCEEDED |
| 1276 | if all([status == statuses.SUCCEEDED for status in item_statuses]) |
| 1277 | else statuses.FAILED |
| 1278 | ) |
no test coverage detected