| 183 | |
| 184 | |
| 185 | class PostgresSource(DataSource): |
| 186 | def __init__( |
| 187 | self, name: str, config: PostgresConfig, duckdb_conn: duckdb.DuckDBPyConnection |
| 188 | ): |
| 189 | super().__init__(name, duckdb_conn) |
| 190 | self.config = config |
| 191 | |
| 192 | # Initialize postgres_scanner extension |
| 193 | self._duckdb.execute("INSTALL postgres_scanner;") |
| 194 | self._duckdb.execute("LOAD postgres_scanner;") |
| 195 | |
| 196 | self._conn_string = ( |
| 197 | f"postgresql://{config.user}:{config.password}" |
| 198 | f"@{config.host}:{config.port}/{config.dbname}" |
| 199 | ) |
| 200 | |
| 201 | def query(self, sql: str) -> pd.DataFrame: |
| 202 | self._duckdb.execute(f"CALL postgres_attach('{self._conn_string}')") |
| 203 | result = self._duckdb.execute(sql).df() |
| 204 | return result |
| 205 | |
| 206 | def to_df(self, table_name: str, schema: str = "public") -> pd.DataFrame: |
| 207 | """Get entire table as a DataFrame""" |
| 208 | logger.info("to_df") |
| 209 | try: |
| 210 | view_name = f"pg_view_{uuid.uuid4().hex[:8]}" |
| 211 | self._duckdb.execute(f""" |
| 212 | CREATE OR REPLACE VIEW {view_name} AS |
| 213 | SELECT * FROM postgres_scan( |
| 214 | '{self._conn_string}', |
| 215 | '{schema}', |
| 216 | '{table_name}' |
| 217 | ) |
| 218 | """) |
| 219 | result = self._duckdb.execute(f"SELECT * FROM {view_name}").df() |
| 220 | self._duckdb.execute(f"DROP VIEW IF EXISTS {view_name}") |
| 221 | return result |
| 222 | except Exception as e: |
| 223 | # Clean up views if they exist |
| 224 | self._duckdb.execute(f"DROP VIEW IF EXISTS {view_name}") |
| 225 | raise Exception(f"Error reading table {schema}.{table_name}: {e!s}") from e |
| 226 | |
| 227 | |
| 228 | class ClickhouseSource(DataSource): |