Executes a pipeline via a subprocess, stores subprocess PID, parameters, and waits for completion, capturing combined stdout/stderr to a log file. Sets the final status (COMPLETED, FAILED, REVOKED) in Redis.
(
task: "Task",
node_path: str,
config_path: Optional[str] = None,
parameters: Optional[Dict[str, Any]] = None,
)
| 97 | |
| 98 | @huey.task(context=True) |
| 99 | def run_pipeline_task( |
| 100 | task: "Task", |
| 101 | node_path: str, |
| 102 | config_path: Optional[str] = None, |
| 103 | parameters: Optional[Dict[str, Any]] = None, |
| 104 | ): |
| 105 | """ |
| 106 | Executes a pipeline via a subprocess, stores subprocess PID, parameters, |
| 107 | and waits for completion, capturing combined stdout/stderr to a log file. |
| 108 | Sets the final status (COMPLETED, FAILED, REVOKED) in Redis. |
| 109 | """ |
| 110 | task_id = str(task.id) |
| 111 | redis_pid_key = f"worker_pid_for_task:{task_id}" # Key now stores *subprocess* PID |
| 112 | redis_output_dir_key = ( |
| 113 | f"output_dir_for_task:{task_id}" # Key for output dir mapping |
| 114 | ) |
| 115 | redis_params_key = f"parameters_for_task:{task_id}" # Key for parameters |
| 116 | redis_status_key = f"status_for_task:{task_id}" # Key for final status |
| 117 | process = None # Initialize process variable |
| 118 | log_file: Optional[io.TextIOWrapper] = None # Initialize log file handle |
| 119 | log_file_path = "" # Store path for logging |
| 120 | final_status_set = False # Flag to ensure final status is set only once |
| 121 | |
| 122 | print( |
| 123 | f"Task {task_id}: Preparing to run pipeline subprocess for node: {node_path}" |
| 124 | ) |
| 125 | |
| 126 | # Initialize progress early to indicate the task is starting |
| 127 | try: |
| 128 | set_progress(task_id, 0.0, "Initializing task...") |
| 129 | print(f"Task {task_id}: Initial progress set to 0.0.") |
| 130 | except Exception as e: |
| 131 | print(f"Task {task_id}: Failed to set initial progress: {e}") |
| 132 | |
| 133 | if parameters is None: |
| 134 | parameters = {} |
| 135 | |
| 136 | print(f"Task {task_id}: Original parameters (first 500 chars): {str(parameters)[:500]}") |
| 137 | |
| 138 | # first, flatten parameters |
| 139 | no_flatten_keys = parameters.get("no_flatten", []) |
| 140 | print(f"Task {task_id}: About to call flatten_config. No_flatten keys: {no_flatten_keys}") |
| 141 | try: |
| 142 | parameters_flat = flatten_config(parameters, no_flatten_keys=no_flatten_keys) |
| 143 | print(f"Task {task_id}: Successfully called flatten_config. Flat params (first 500 chars): {str(parameters_flat)[:500]}") |
| 144 | except Exception as fc_e: |
| 145 | print(f"Task {task_id}: Error during flatten_config: {fc_e}", exc_info=True) |
| 146 | # Set final status to FAILED |
| 147 | set_final_status( |
| 148 | task_id, |
| 149 | "FAILED", |
| 150 | f"Task failed during parameter flattening: {fc_e}", |
| 151 | details={"error": str(fc_e), "traceback": traceback.format_exc()}, |
| 152 | ) |
| 153 | final_status_set = True # Mark as set |
| 154 | raise # Re-raise to be caught by the main handler |
| 155 | |
| 156 | if "task_id" not in parameters_flat: |
no test coverage detected