Query a data source using sql from preswald.toml by name
(sql: str, source_name: str)
| 24 | |
| 25 | |
| 26 | def query(sql: str, source_name: str) -> pd.DataFrame: |
| 27 | """ |
| 28 | Query a data source using sql from preswald.toml by name |
| 29 | """ |
| 30 | try: |
| 31 | service = PreswaldService.get_instance() |
| 32 | df_result = service.data_manager.query(sql, source_name) |
| 33 | logger.info(f"Successfully queried data source: {source_name}") |
| 34 | return df_result |
| 35 | except Exception as e: |
| 36 | logger.error(f"Error querying data source: {e}") |
| 37 | |
| 38 | |
| 39 | def get_df(source_name: str, table_name: str | None = None) -> pd.DataFrame: |
nothing calls this directly
no test coverage detected