| 350 | |
| 351 | class ParquetSource(DataSource): |
| 352 | def __init__( |
| 353 | self, name: str, config: ParquetConfig, duckdb_conn: duckdb.DuckDBPyConnection |
| 354 | ): |
| 355 | super().__init__(name, duckdb_conn) |
| 356 | self.path = config.path |
| 357 | self.columns = config.columns |
| 358 | self._table_name = f"parquet_{uuid.uuid4().hex[:8]}" |
| 359 | |
| 360 | try: |
| 361 | # Load Parquet using DuckDB |
| 362 | if self.columns: |
| 363 | column_str = ", ".join(f'"{col}"' for col in self.columns) |
| 364 | self._duckdb.execute(f""" |
| 365 | CREATE TABLE {self._table_name} AS |
| 366 | SELECT {column_str} |
| 367 | FROM read_parquet('{self.path}') |
| 368 | """) |
| 369 | else: |
| 370 | self._duckdb.execute(f""" |
| 371 | CREATE TABLE {self._table_name} AS |
| 372 | SELECT * FROM read_parquet('{self.path}') |
| 373 | """) |
| 374 | except Exception as e: |
| 375 | raise Exception( |
| 376 | f"Failed to load parquet file '{self.path}' using DuckDB: {e!s}" |
| 377 | ) from e |
| 378 | |
| 379 | def query(self, sql: str) -> pd.DataFrame: |
| 380 | query = sql.replace(self.name, self._table_name) |