(wf_ex_db, st2_ctx, task_ex_req)
| 568 | |
| 569 | |
| 570 | def request_task_execution(wf_ex_db, st2_ctx, task_ex_req): |
| 571 | task_id = task_ex_req["id"] |
| 572 | task_route = task_ex_req["route"] |
| 573 | task_spec = task_ex_req["spec"] |
| 574 | task_ctx = task_ex_req["ctx"] |
| 575 | task_actions = task_ex_req["actions"] |
| 576 | task_delay = task_ex_req.get("delay") |
| 577 | |
| 578 | msg = 'Processing task execution request for task "%s", route "%s".' |
| 579 | update_progress(wf_ex_db, msg % (task_id, str(task_route)), stream=False) |
| 580 | |
| 581 | # Use existing task execution when task is with items and still running. |
| 582 | task_ex_dbs = wf_db_access.TaskExecution.query( |
| 583 | workflow_execution=str(wf_ex_db.id), |
| 584 | task_id=task_id, |
| 585 | task_route=task_route, |
| 586 | order_by=["-start_timestamp"], |
| 587 | ) |
| 588 | |
| 589 | if ( |
| 590 | len(task_ex_dbs) > 0 |
| 591 | and task_ex_dbs[0].itemized |
| 592 | and task_ex_dbs[0].status == ac_const.LIVEACTION_STATUS_RUNNING |
| 593 | ): |
| 594 | task_ex_db = task_ex_dbs[0] |
| 595 | task_ex_id = str(task_ex_db.id) |
| 596 | msg = 'Task execution "%s" retrieved for task "%s", route "%s".' |
| 597 | update_progress(wf_ex_db, msg % (task_ex_id, task_id, str(task_route))) |
| 598 | else: |
| 599 | # Create a record for task execution. |
| 600 | task_ex_db = wf_db_models.TaskExecutionDB( |
| 601 | workflow_execution=str(wf_ex_db.id), |
| 602 | task_name=task_spec.name or task_id, |
| 603 | task_id=task_id, |
| 604 | task_route=task_route, |
| 605 | task_spec=task_spec.serialize(), |
| 606 | delay=task_delay, |
| 607 | itemized=task_spec.has_items(), |
| 608 | items_count=task_ex_req.get("items_count"), |
| 609 | items_concurrency=task_ex_req.get("concurrency"), |
| 610 | context=task_ctx, |
| 611 | status=statuses.REQUESTED, |
| 612 | ) |
| 613 | |
| 614 | # Prepare the result format for itemized task execution. |
| 615 | if task_ex_db.itemized: |
| 616 | task_ex_db.result = {"items": [None] * task_ex_db.items_count} |
| 617 | |
| 618 | # Insert new record into the database. |
| 619 | task_ex_db = wf_db_access.TaskExecution.insert(task_ex_db, publish=False) |
| 620 | task_ex_id = str(task_ex_db.id) |
| 621 | msg = 'Task execution "%s" created for task "%s", route "%s".' |
| 622 | update_progress(wf_ex_db, msg % (task_ex_id, task_id, str(task_route))) |
| 623 | |
| 624 | try: |
| 625 | # Return here if no action is specified in task spec. |
| 626 | if task_spec.action is None: |
| 627 | msg = 'Task "%s", route "%s", is action less and succeed by default.' |
no test coverage detected