MCPcopy Index your code
hub / github.com/StackStorm/st2 / request

Function request

st2common/st2common/services/workflows.py:222–313  ·  view source on GitHub ↗
(wf_def, ac_ex_db, st2_ctx, notify_cfg=None)

Source from the content-addressed store, hash-verified

220
221
222def request(wf_def, ac_ex_db, st2_ctx, notify_cfg=None):
223 LOG.info("[%s] Processing action execution request for workflow.", str(ac_ex_db.id))
224
225 # Load workflow definition into workflow spec model.
226 spec_module = specs_loader.get_spec_module("native")
227 wf_spec = spec_module.instantiate(wf_def)
228
229 # Inspect the workflow spec.
230 inspect(wf_spec, st2_ctx, raise_exception=True)
231
232 # Identify the action to execute.
233 action_db = action_utils.get_action_by_ref(ref=ac_ex_db.action["ref"])
234
235 if not action_db:
236 error = 'Unable to find action "%s".' % ac_ex_db.action["ref"]
237 raise ac_exc.InvalidActionReferencedException(error)
238
239 # Identify the runner for the action.
240 runner_type_db = action_utils.get_runnertype_by_name(action_db.runner_type["name"])
241
242 # Render action execution parameters.
243 runner_params, action_params = param_utils.render_final_params(
244 runner_type_db.runner_parameters,
245 action_db.parameters,
246 ac_ex_db.parameters,
247 ac_ex_db.context,
248 )
249
250 # Instantiate the workflow conductor.
251 conductor_params = {"inputs": action_params, "context": st2_ctx}
252 conductor = conducting.WorkflowConductor(wf_spec, **conductor_params)
253
254 # Serialize the conductor which initializes some internal values.
255 data = conductor.serialize()
256
257 # Create a record for workflow execution.
258 wf_ex_db = wf_db_models.WorkflowExecutionDB(
259 action_execution=str(ac_ex_db.id),
260 spec=data["spec"],
261 graph=data["graph"],
262 input=data["input"],
263 context=data["context"],
264 state=data["state"],
265 status=data["state"]["status"],
266 output=data["output"],
267 errors=data["errors"],
268 )
269
270 # Inspect that the list of tasks in the notify parameter exist in the workflow spec.
271 if runner_params.get("notify"):
272 invalid_tasks = list(
273 set(runner_params.get("notify")) - set(wf_spec.tasks.keys())
274 )
275
276 if invalid_tasks:
277 raise wf_exc.WorkflowExecutionException(
278 "The following tasks in the notify parameter do not exist "
279 "in the workflow definition: %s." % ", ".join(invalid_tasks)

Callers

nothing calls this directly

Calls 8

inspectFunction · 0.85
update_progressFunction · 0.85
serializeMethod · 0.80
keysMethod · 0.80
publish_statusMethod · 0.80
getMethod · 0.45
insertMethod · 0.45
updateMethod · 0.45

Tested by

no test coverage detected