| 250 | return self.sql_generation_repository.find_by(query) |
| 251 | |
| 252 | def execute(self, sql_generation_id: str, max_rows: int = 100) -> tuple[str, dict]: |
| 253 | sql_generation = self.sql_generation_repository.find_by_id(sql_generation_id) |
| 254 | if not sql_generation: |
| 255 | raise SQLGenerationNotFoundError( |
| 256 | f"SQL Generation {sql_generation_id} not found" |
| 257 | ) |
| 258 | prompt_repository = PromptRepository(self.storage) |
| 259 | prompt = prompt_repository.find_by_id(sql_generation.prompt_id) |
| 260 | db_connection_repository = DatabaseConnectionRepository(self.storage) |
| 261 | db_connection = db_connection_repository.find_by_id(prompt.db_connection_id) |
| 262 | database = SQLDatabase.get_sql_engine(db_connection, True) |
| 263 | return database.run_sql(sql_generation.sql, max_rows) |
| 264 | |
| 265 | def update_metadata(self, sql_generation_id, metadata_request) -> SQLGeneration: |
| 266 | sql_generation = self.sql_generation_repository.find_by_id(sql_generation_id) |