Analyze a single workflow file and extract metadata.
(self, file_path: str)
| 154 | return " ".join(readable_parts) |
| 155 | |
| 156 | def analyze_workflow_file(self, file_path: str) -> Optional[Dict[str, Any]]: |
| 157 | """Analyze a single workflow file and extract metadata.""" |
| 158 | try: |
| 159 | with open(file_path, "r", encoding="utf-8") as f: |
| 160 | data = json.load(f) |
| 161 | except (json.JSONDecodeError, UnicodeDecodeError) as e: |
| 162 | print(f"Error reading {file_path}: {str(e)}") |
| 163 | return None |
| 164 | |
| 165 | filename = os.path.basename(file_path) |
| 166 | file_size = os.path.getsize(file_path) |
| 167 | file_hash = self.get_file_hash(file_path) |
| 168 | |
| 169 | # Extract basic metadata |
| 170 | workflow = { |
| 171 | "filename": filename, |
| 172 | "name": self.format_workflow_name(filename), |
| 173 | "workflow_id": data.get("id", ""), |
| 174 | "active": data.get("active", False), |
| 175 | "nodes": data.get("nodes", []), |
| 176 | "connections": data.get("connections", {}), |
| 177 | "tags": data.get("tags", []), |
| 178 | "created_at": data.get("createdAt", ""), |
| 179 | "updated_at": data.get("updatedAt", ""), |
| 180 | "file_hash": file_hash, |
| 181 | "file_size": file_size, |
| 182 | } |
| 183 | |
| 184 | # Use JSON name if available and meaningful, otherwise use formatted filename |
| 185 | json_name = data.get("name", "").strip() |
| 186 | if ( |
| 187 | json_name |
| 188 | and json_name != filename.replace(".json", "") |
| 189 | and not json_name.startswith("My workflow") |
| 190 | ): |
| 191 | workflow["name"] = json_name |
| 192 | # If no meaningful JSON name, use formatted filename (already set above) |
| 193 | |
| 194 | # Analyze nodes |
| 195 | node_count = len(workflow["nodes"]) |
| 196 | workflow["node_count"] = node_count |
| 197 | |
| 198 | # Determine complexity |
| 199 | if node_count <= 5: |
| 200 | complexity = "low" |
| 201 | elif node_count <= 15: |
| 202 | complexity = "medium" |
| 203 | else: |
| 204 | complexity = "high" |
| 205 | workflow["complexity"] = complexity |
| 206 | |
| 207 | # Find trigger type and integrations |
| 208 | trigger_type, integrations = self.analyze_nodes(workflow["nodes"]) |
| 209 | workflow["trigger_type"] = trigger_type |
| 210 | workflow["integrations"] = list(integrations) |
| 211 | |
| 212 | # Use JSON description if available, otherwise generate one |
| 213 | json_description = data.get("description", "").strip() |
no test coverage detected