MCPcopy
hub / github.com/e-p-armstrong/augmentoolkit / run_pipeline_task

Function run_pipeline_task

tasks.py:99–513  ·  view source on GitHub ↗

Executes a pipeline via a subprocess, stores subprocess PID, parameters, and waits for completion, capturing combined stdout/stderr to a log file. Sets the final status (COMPLETED, FAILED, REVOKED) in Redis.

(
    task: "Task",
    node_path: str,
    config_path: Optional[str] = None,
    parameters: Optional[Dict[str, Any]] = None,
)

Source from the content-addressed store, hash-verified

97
98@huey.task(context=True)
99def run_pipeline_task(
100 task: "Task",
101 node_path: str,
102 config_path: Optional[str] = None,
103 parameters: Optional[Dict[str, Any]] = None,
104):
105 """
106 Executes a pipeline via a subprocess, stores subprocess PID, parameters,
107 and waits for completion, capturing combined stdout/stderr to a log file.
108 Sets the final status (COMPLETED, FAILED, REVOKED) in Redis.
109 """
110 task_id = str(task.id)
111 redis_pid_key = f"worker_pid_for_task:{task_id}" # Key now stores *subprocess* PID
112 redis_output_dir_key = (
113 f"output_dir_for_task:{task_id}" # Key for output dir mapping
114 )
115 redis_params_key = f"parameters_for_task:{task_id}" # Key for parameters
116 redis_status_key = f"status_for_task:{task_id}" # Key for final status
117 process = None # Initialize process variable
118 log_file: Optional[io.TextIOWrapper] = None # Initialize log file handle
119 log_file_path = "" # Store path for logging
120 final_status_set = False # Flag to ensure final status is set only once
121
122 print(
123 f"Task {task_id}: Preparing to run pipeline subprocess for node: {node_path}"
124 )
125
126 # Initialize progress early to indicate the task is starting
127 try:
128 set_progress(task_id, 0.0, "Initializing task...")
129 print(f"Task {task_id}: Initial progress set to 0.0.")
130 except Exception as e:
131 print(f"Task {task_id}: Failed to set initial progress: {e}")
132
133 if parameters is None:
134 parameters = {}
135
136 print(f"Task {task_id}: Original parameters (first 500 chars): {str(parameters)[:500]}")
137
138 # first, flatten parameters
139 no_flatten_keys = parameters.get("no_flatten", [])
140 print(f"Task {task_id}: About to call flatten_config. No_flatten keys: {no_flatten_keys}")
141 try:
142 parameters_flat = flatten_config(parameters, no_flatten_keys=no_flatten_keys)
143 print(f"Task {task_id}: Successfully called flatten_config. Flat params (first 500 chars): {str(parameters_flat)[:500]}")
144 except Exception as fc_e:
145 print(f"Task {task_id}: Error during flatten_config: {fc_e}", exc_info=True)
146 # Set final status to FAILED
147 set_final_status(
148 task_id,
149 "FAILED",
150 f"Task failed during parameter flattening: {fc_e}",
151 details={"error": str(fc_e), "traceback": traceback.format_exc()},
152 )
153 final_status_set = True # Mark as set
154 raise # Re-raise to be caught by the main handler
155
156 if "task_id" not in parameters_flat:

Callers 1

queue_pipeline_runFunction · 0.90

Calls 5

set_progressFunction · 0.90
flatten_configFunction · 0.90
resolve_pathFunction · 0.90
set_final_statusFunction · 0.85
find_first_output_dirFunction · 0.85

Tested by

no test coverage detected