(
wf_ex_db,
conductor,
update_lv_ac_on_statuses=None,
pub_wf_ex=False,
pub_lv_ac=True,
pub_ac_ex=True,
)
| 1394 | |
| 1395 | |
| 1396 | def update_execution_records( |
| 1397 | wf_ex_db, |
| 1398 | conductor, |
| 1399 | update_lv_ac_on_statuses=None, |
| 1400 | pub_wf_ex=False, |
| 1401 | pub_lv_ac=True, |
| 1402 | pub_ac_ex=True, |
| 1403 | ): |
| 1404 | # If the workflow execution is completed, then render the workflow output. |
| 1405 | if conductor.get_workflow_status() in statuses.COMPLETED_STATUSES: |
| 1406 | conductor.render_workflow_output() |
| 1407 | |
| 1408 | # Determine if workflow status has changed. |
| 1409 | wf_old_status = wf_ex_db.status |
| 1410 | wf_ex_db.status = conductor.get_workflow_status() |
| 1411 | status_changed = wf_old_status != wf_ex_db.status |
| 1412 | |
| 1413 | if status_changed: |
| 1414 | msg = 'Updating workflow execution from status "%s" to "%s".' |
| 1415 | update_progress(wf_ex_db, msg % (wf_old_status, wf_ex_db.status)) |
| 1416 | |
| 1417 | # Update timestamp and output if workflow is completed. |
| 1418 | if wf_ex_db.status in statuses.COMPLETED_STATUSES: |
| 1419 | wf_ex_db.end_timestamp = date_utils.get_datetime_utc_now() |
| 1420 | wf_ex_db.output = conductor.get_workflow_output() |
| 1421 | |
| 1422 | # Update task flow and other attributes. |
| 1423 | wf_ex_db.errors = copy.deepcopy(conductor.errors) |
| 1424 | wf_ex_db.state = conductor.workflow_state.serialize() |
| 1425 | |
| 1426 | # Write changes to the database. |
| 1427 | wf_ex_db = wf_db_access.WorkflowExecution.update(wf_ex_db, publish=pub_wf_ex) |
| 1428 | |
| 1429 | # Return if workflow execution status is not specified in update_lv_ac_on_statuses. |
| 1430 | if ( |
| 1431 | isinstance(update_lv_ac_on_statuses, list) |
| 1432 | and wf_ex_db.status not in update_lv_ac_on_statuses |
| 1433 | ): |
| 1434 | return |
| 1435 | |
| 1436 | # Update the corresponding liveaction and action execution for the workflow. |
| 1437 | wf_ac_ex_db = ex_db_access.ActionExecution.get_by_id(wf_ex_db.action_execution) |
| 1438 | wf_lv_ac_db = action_utils.get_liveaction_by_id(wf_ac_ex_db.liveaction["id"]) |
| 1439 | |
| 1440 | # Gather result for liveaction and action execution. |
| 1441 | result = {"output": wf_ex_db.output or None} |
| 1442 | |
| 1443 | if wf_ex_db.status in statuses.ABENDED_STATUSES: |
| 1444 | result["errors"] = wf_ex_db.errors |
| 1445 | |
| 1446 | if wf_ex_db.errors: |
| 1447 | msg = "Workflow execution completed with errors." |
| 1448 | update_progress(wf_ex_db, msg, severity="error") |
| 1449 | |
| 1450 | for wf_ex_error in wf_ex_db.errors: |
| 1451 | update_progress(wf_ex_db, wf_ex_error, severity="error") |
| 1452 | |
| 1453 | # Sync update with corresponding liveaction and action execution. |
no test coverage detected