MCPcopy Index your code
hub / github.com/StackStorm/st2 / request_task_execution

Function request_task_execution

st2common/st2common/services/workflows.py:570–682  ·  view source on GitHub ↗
(wf_ex_db, st2_ctx, task_ex_req)

Source from the content-addressed store, hash-verified

568
569
570def request_task_execution(wf_ex_db, st2_ctx, task_ex_req):
571 task_id = task_ex_req["id"]
572 task_route = task_ex_req["route"]
573 task_spec = task_ex_req["spec"]
574 task_ctx = task_ex_req["ctx"]
575 task_actions = task_ex_req["actions"]
576 task_delay = task_ex_req.get("delay")
577
578 msg = 'Processing task execution request for task "%s", route "%s".'
579 update_progress(wf_ex_db, msg % (task_id, str(task_route)), stream=False)
580
581 # Use existing task execution when task is with items and still running.
582 task_ex_dbs = wf_db_access.TaskExecution.query(
583 workflow_execution=str(wf_ex_db.id),
584 task_id=task_id,
585 task_route=task_route,
586 order_by=["-start_timestamp"],
587 )
588
589 if (
590 len(task_ex_dbs) > 0
591 and task_ex_dbs[0].itemized
592 and task_ex_dbs[0].status == ac_const.LIVEACTION_STATUS_RUNNING
593 ):
594 task_ex_db = task_ex_dbs[0]
595 task_ex_id = str(task_ex_db.id)
596 msg = 'Task execution "%s" retrieved for task "%s", route "%s".'
597 update_progress(wf_ex_db, msg % (task_ex_id, task_id, str(task_route)))
598 else:
599 # Create a record for task execution.
600 task_ex_db = wf_db_models.TaskExecutionDB(
601 workflow_execution=str(wf_ex_db.id),
602 task_name=task_spec.name or task_id,
603 task_id=task_id,
604 task_route=task_route,
605 task_spec=task_spec.serialize(),
606 delay=task_delay,
607 itemized=task_spec.has_items(),
608 items_count=task_ex_req.get("items_count"),
609 items_concurrency=task_ex_req.get("concurrency"),
610 context=task_ctx,
611 status=statuses.REQUESTED,
612 )
613
614 # Prepare the result format for itemized task execution.
615 if task_ex_db.itemized:
616 task_ex_db.result = {"items": [None] * task_ex_db.items_count}
617
618 # Insert new record into the database.
619 task_ex_db = wf_db_access.TaskExecution.insert(task_ex_db, publish=False)
620 task_ex_id = str(task_ex_db.id)
621 msg = 'Task execution "%s" created for task "%s", route "%s".'
622 update_progress(wf_ex_db, msg % (task_ex_id, task_id, str(task_route)))
623
624 try:
625 # Return here if no action is specified in task spec.
626 if task_spec.action is None:
627 msg = 'Task "%s", route "%s", is action less and succeed by default.'

Callers 1

request_next_tasksFunction · 0.85

Calls 11

update_progressFunction · 0.85
update_task_executionFunction · 0.85
update_task_stateFunction · 0.85
request_action_executionFunction · 0.85
serializeMethod · 0.80
getMethod · 0.45
queryMethod · 0.45
insertMethod · 0.45
updateMethod · 0.45
get_by_idMethod · 0.45

Tested by

no test coverage detected