Execute SQL query via QuestDB HTTP API.
(host: str, port: int, sql: str)
| 94 | |
| 95 | |
| 96 | def execute_sql(host: str, port: int, sql: str) -> dict: |
| 97 | """Execute SQL query via QuestDB HTTP API.""" |
| 98 | url = f"http://{host}:{port}/exec" |
| 99 | params = {"query": sql} |
| 100 | response = requests.get(url, params=params, timeout=120) |
| 101 | if not response.ok: |
| 102 | raise RuntimeError(f"SQL failed: {response.text}") |
| 103 | return response.json() |
| 104 | |
| 105 | |
| 106 | def export_parquet(host: str, port: int, query: str, output_path: str) -> None: |
no test coverage detected
searching dependent graphs…