Discover schemas for all data sources
(self)
| 321 | self.query_builder.use_materialized = use_materialized |
| 322 | |
| 323 | def _discover_all_schemas(self): |
| 324 | """Discover schemas for all data sources""" |
| 325 | for source_name, source_info in self.DATA_SOURCES.items(): |
| 326 | if source_name == 'partitions': |
| 327 | # Special handling for partitions - hardcode the schema |
| 328 | # The partitions file requires special parsing, so we can't auto-discover |
| 329 | self.schema_cache[source_name] = [ |
| 330 | ('dev_maj', 'INTEGER'), |
| 331 | ('dev_min', 'INTEGER'), |
| 332 | ('devname', 'VARCHAR') |
| 333 | ] |
| 334 | continue |
| 335 | |
| 336 | # Build a simple query to get schema |
| 337 | fragment_path = self.fragments_path / source_info['fragment'] |
| 338 | if not fragment_path.exists(): |
| 339 | self.logger.warning(f"Fragment file not found: {fragment_path}") |
| 340 | continue |
| 341 | |
| 342 | with open(fragment_path, 'r') as f: |
| 343 | fragment = f.read() |
| 344 | |
| 345 | # Extract the SELECT statement from the fragment |
| 346 | if source_info['is_base']: |
| 347 | # Base query - use directly |
| 348 | query = fragment.replace('#XTOP_DATADIR#', str(self.data_source.datadir)) |
| 349 | else: |
| 350 | # Extract the SELECT from within the JOIN |
| 351 | match = re.search(r'SELECT.*?FROM\s+read_csv_auto\([^)]+\)', fragment, re.DOTALL | re.IGNORECASE) |
| 352 | if match: |
| 353 | query = match.group(0).replace('#XTOP_DATADIR#', str(self.data_source.datadir)) |
| 354 | else: |
| 355 | self.logger.warning(f"Could not extract SELECT from fragment: {source_name}") |
| 356 | continue |
| 357 | |
| 358 | # Get schema using DESCRIBE, with parquet fallback if CSV not available |
| 359 | def _describe(q: str): |
| 360 | conn = self.data_source.connect() |
| 361 | return conn.execute(f"DESCRIBE ({q} LIMIT 0)").fetchall() |
| 362 | try: |
| 363 | result = _describe(query) |
| 364 | except Exception as e: |
| 365 | # Attempt parquet fallback by replacing CSV reader/patterns |
| 366 | try: |
| 367 | pq_query = (query |
| 368 | .replace('read_csv_auto', 'read_parquet') |
| 369 | .replace('*.csv', '*.parquet')) |
| 370 | result = _describe(pq_query) |
| 371 | except Exception as e2: |
| 372 | self.logger.error(f"Error discovering schema for {source_name}: {e2}") |
| 373 | result = [] |
| 374 | self.schema_cache[source_name] = [(row[0], row[1]) for row in result] |
| 375 | if result: |
| 376 | self.logger.debug(f"Discovered schema for {source_name}: {len(result)} columns") |
| 377 | |
| 378 | def get_column_to_source_mapping(self) -> Dict[str, str]: |
| 379 | """Get mapping of column names to their source data files""" |