(wf_def, ac_ex_db, st2_ctx, notify_cfg=None)
| 220 | |
| 221 | |
| 222 | def request(wf_def, ac_ex_db, st2_ctx, notify_cfg=None): |
| 223 | LOG.info("[%s] Processing action execution request for workflow.", str(ac_ex_db.id)) |
| 224 | |
| 225 | # Load workflow definition into workflow spec model. |
| 226 | spec_module = specs_loader.get_spec_module("native") |
| 227 | wf_spec = spec_module.instantiate(wf_def) |
| 228 | |
| 229 | # Inspect the workflow spec. |
| 230 | inspect(wf_spec, st2_ctx, raise_exception=True) |
| 231 | |
| 232 | # Identify the action to execute. |
| 233 | action_db = action_utils.get_action_by_ref(ref=ac_ex_db.action["ref"]) |
| 234 | |
| 235 | if not action_db: |
| 236 | error = 'Unable to find action "%s".' % ac_ex_db.action["ref"] |
| 237 | raise ac_exc.InvalidActionReferencedException(error) |
| 238 | |
| 239 | # Identify the runner for the action. |
| 240 | runner_type_db = action_utils.get_runnertype_by_name(action_db.runner_type["name"]) |
| 241 | |
| 242 | # Render action execution parameters. |
| 243 | runner_params, action_params = param_utils.render_final_params( |
| 244 | runner_type_db.runner_parameters, |
| 245 | action_db.parameters, |
| 246 | ac_ex_db.parameters, |
| 247 | ac_ex_db.context, |
| 248 | ) |
| 249 | |
| 250 | # Instantiate the workflow conductor. |
| 251 | conductor_params = {"inputs": action_params, "context": st2_ctx} |
| 252 | conductor = conducting.WorkflowConductor(wf_spec, **conductor_params) |
| 253 | |
| 254 | # Serialize the conductor which initializes some internal values. |
| 255 | data = conductor.serialize() |
| 256 | |
| 257 | # Create a record for workflow execution. |
| 258 | wf_ex_db = wf_db_models.WorkflowExecutionDB( |
| 259 | action_execution=str(ac_ex_db.id), |
| 260 | spec=data["spec"], |
| 261 | graph=data["graph"], |
| 262 | input=data["input"], |
| 263 | context=data["context"], |
| 264 | state=data["state"], |
| 265 | status=data["state"]["status"], |
| 266 | output=data["output"], |
| 267 | errors=data["errors"], |
| 268 | ) |
| 269 | |
| 270 | # Inspect that the list of tasks in the notify parameter exist in the workflow spec. |
| 271 | if runner_params.get("notify"): |
| 272 | invalid_tasks = list( |
| 273 | set(runner_params.get("notify")) - set(wf_spec.tasks.keys()) |
| 274 | ) |
| 275 | |
| 276 | if invalid_tasks: |
| 277 | raise wf_exc.WorkflowExecutionException( |
| 278 | "The following tasks in the notify parameter do not exist " |
| 279 | "in the workflow definition: %s." % ", ".join(invalid_tasks) |
nothing calls this directly
no test coverage detected