Initialize all data sources from config
(self)
| 393 | self.duckdb_conn = duckdb.connect(":memory:") |
| 394 | |
| 395 | def connect(self): # noqa: C901 |
| 396 | """Initialize all data sources from config""" |
| 397 | # Useful debugging query - Log final DuckDB state |
| 398 | # tables_df = self.duckdb_conn.execute(""" |
| 399 | # SELECT |
| 400 | # table_name, |
| 401 | # column_count, |
| 402 | # estimated_size as size_b |
| 403 | # FROM duckdb_tables() |
| 404 | # """).df() |
| 405 | |
| 406 | # logger.info(f"Current DuckDB state - {len(tables_df)} tables:") |
| 407 | # for _, row in tables_df.iterrows(): |
| 408 | # logger.info( |
| 409 | # f"Table: {row['table_name']}, Columns: {row['column_count']}, Estimated Size: {row['size_b']:.2f}B" |
| 410 | # ) |
| 411 | |
| 412 | config = self._load_sources() |
| 413 | |
| 414 | # Only process sources that are new or have changed |
| 415 | for name, source_config in config.items(): |
| 416 | if "type" not in source_config: |
| 417 | continue |
| 418 | |
| 419 | if not self._has_source_changed(name, source_config): |
| 420 | logger.debug(f"Skipping unchanged source: {name}") |
| 421 | continue |
| 422 | |
| 423 | if name in self.sources: |
| 424 | self._drop_source_table(self.sources[name]) |
| 425 | |
| 426 | source_type = source_config["type"] |
| 427 | logger.info(f"Initializing/updating source: {name} ({source_type})") |
| 428 | |
| 429 | try: |
| 430 | if source_type == "csv": |
| 431 | cfg = CSVConfig(path=source_config["path"]) |
| 432 | self.sources[name] = CSVSource(name, cfg, self.duckdb_conn) |
| 433 | |
| 434 | elif source_type == "json": |
| 435 | cfg = JSONConfig( |
| 436 | path=source_config["path"], |
| 437 | record_path=source_config.get("record_path"), |
| 438 | flatten=source_config.get("flatten", True), |
| 439 | ) |
| 440 | self.sources[name] = JSONSource(name, cfg, self.duckdb_conn) |
| 441 | |
| 442 | elif source_type == "postgres": |
| 443 | cfg = PostgresConfig( |
| 444 | host=source_config["host"], |
| 445 | port=source_config["port"], |
| 446 | dbname=source_config["dbname"], |
| 447 | user=source_config["user"], |
| 448 | password=source_config["password"], |
| 449 | ) |
| 450 | self.sources[name] = PostgresSource(name, cfg, self.duckdb_conn) |
| 451 | |
| 452 | elif source_type == "clickhouse": |
no test coverage detected