MCPcopy
hub / github.com/StructuredLabs/preswald / PostgresSource

Class PostgresSource

preswald/engine/managers/data.py:185–225  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

183
184
185class PostgresSource(DataSource):
186 def __init__(
187 self, name: str, config: PostgresConfig, duckdb_conn: duckdb.DuckDBPyConnection
188 ):
189 super().__init__(name, duckdb_conn)
190 self.config = config
191
192 # Initialize postgres_scanner extension
193 self._duckdb.execute("INSTALL postgres_scanner;")
194 self._duckdb.execute("LOAD postgres_scanner;")
195
196 self._conn_string = (
197 f"postgresql://{config.user}:{config.password}"
198 f"@{config.host}:{config.port}/{config.dbname}"
199 )
200
201 def query(self, sql: str) -> pd.DataFrame:
202 self._duckdb.execute(f"CALL postgres_attach('{self._conn_string}')")
203 result = self._duckdb.execute(sql).df()
204 return result
205
206 def to_df(self, table_name: str, schema: str = "public") -> pd.DataFrame:
207 """Get entire table as a DataFrame"""
208 logger.info("to_df")
209 try:
210 view_name = f"pg_view_{uuid.uuid4().hex[:8]}"
211 self._duckdb.execute(f"""
212 CREATE OR REPLACE VIEW {view_name} AS
213 SELECT * FROM postgres_scan(
214 '{self._conn_string}',
215 '{schema}',
216 '{table_name}'
217 )
218 """)
219 result = self._duckdb.execute(f"SELECT * FROM {view_name}").df()
220 self._duckdb.execute(f"DROP VIEW IF EXISTS {view_name}")
221 return result
222 except Exception as e:
223 # Clean up views if they exist
224 self._duckdb.execute(f"DROP VIEW IF EXISTS {view_name}")
225 raise Exception(f"Error reading table {schema}.{table_name}: {e!s}") from e
226
227
228class ClickhouseSource(DataSource):

Callers 1

connectMethod · 0.85

Calls

no outgoing calls

Tested by

no test coverage detected