MCPcopy Index your code
hub / github.com/tanelpoder/0xtools / _discover_all_schemas

Method _discover_all_schemas

xtop/core/query_engine.py:323–376  ·  view source on GitHub ↗

Discover schemas for all data sources

(self)

Source from the content-addressed store, hash-verified

321 self.query_builder.use_materialized = use_materialized
322
323 def _discover_all_schemas(self):
324 """Discover schemas for all data sources"""
325 for source_name, source_info in self.DATA_SOURCES.items():
326 if source_name == 'partitions':
327 # Special handling for partitions - hardcode the schema
328 # The partitions file requires special parsing, so we can't auto-discover
329 self.schema_cache[source_name] = [
330 ('dev_maj', 'INTEGER'),
331 ('dev_min', 'INTEGER'),
332 ('devname', 'VARCHAR')
333 ]
334 continue
335
336 # Build a simple query to get schema
337 fragment_path = self.fragments_path / source_info['fragment']
338 if not fragment_path.exists():
339 self.logger.warning(f"Fragment file not found: {fragment_path}")
340 continue
341
342 with open(fragment_path, 'r') as f:
343 fragment = f.read()
344
345 # Extract the SELECT statement from the fragment
346 if source_info['is_base']:
347 # Base query - use directly
348 query = fragment.replace('#XTOP_DATADIR#', str(self.data_source.datadir))
349 else:
350 # Extract the SELECT from within the JOIN
351 match = re.search(r'SELECT.*?FROM\s+read_csv_auto\([^)]+\)', fragment, re.DOTALL | re.IGNORECASE)
352 if match:
353 query = match.group(0).replace('#XTOP_DATADIR#', str(self.data_source.datadir))
354 else:
355 self.logger.warning(f"Could not extract SELECT from fragment: {source_name}")
356 continue
357
358 # Get schema using DESCRIBE, with parquet fallback if CSV not available
359 def _describe(q: str):
360 conn = self.data_source.connect()
361 return conn.execute(f"DESCRIBE ({q} LIMIT 0)").fetchall()
362 try:
363 result = _describe(query)
364 except Exception as e:
365 # Attempt parquet fallback by replacing CSV reader/patterns
366 try:
367 pq_query = (query
368 .replace('read_csv_auto', 'read_parquet')
369 .replace('*.csv', '*.parquet'))
370 result = _describe(pq_query)
371 except Exception as e2:
372 self.logger.error(f"Error discovering schema for {source_name}: {e2}")
373 result = []
374 self.schema_cache[source_name] = [(row[0], row[1]) for row in result]
375 if result:
376 self.logger.debug(f"Discovered schema for {source_name}: {len(result)} columns")
377
378 def get_column_to_source_mapping(self) -> Dict[str, str]:
379 """Get mapping of column names to their source data files"""

Callers 1

__init__Method · 0.95

Calls 1

errorMethod · 0.80

Tested by

no test coverage detected