(
cls, database_info: DatabaseConnection, refresh_connection=False
)
| 67 | |
| 68 | @classmethod |
| 69 | def get_sql_engine( |
| 70 | cls, database_info: DatabaseConnection, refresh_connection=False |
| 71 | ) -> "SQLDatabase": |
| 72 | logger.info(f"Connecting db: {database_info.id}") |
| 73 | try: |
| 74 | if ( |
| 75 | database_info.id |
| 76 | and database_info.id in DBConnections.db_connections |
| 77 | and not refresh_connection |
| 78 | ): |
| 79 | sql_database = DBConnections.db_connections[database_info.id] |
| 80 | sql_database.engine.connect() |
| 81 | return sql_database |
| 82 | except OperationalError: |
| 83 | pass |
| 84 | |
| 85 | fernet_encrypt = FernetEncrypt() |
| 86 | try: |
| 87 | if database_info.use_ssh: |
| 88 | engine = cls.from_uri_ssh(database_info) |
| 89 | DBConnections.add(database_info.id, engine) |
| 90 | return engine |
| 91 | except Exception as e: |
| 92 | raise SSHInvalidDatabaseConnectionError( |
| 93 | "Invalid SSH connection", description=str(e) |
| 94 | ) from e |
| 95 | try: |
| 96 | db_uri = unquote(fernet_encrypt.decrypt(database_info.connection_uri)) |
| 97 | |
| 98 | file_path = database_info.path_to_credentials_file |
| 99 | if file_path and file_path.lower().startswith("s3"): |
| 100 | s3 = S3() |
| 101 | file_path = s3.download(file_path) |
| 102 | |
| 103 | if db_uri.lower().startswith("bigquery"): |
| 104 | db_uri = db_uri + f"?credentials_path={file_path}" |
| 105 | |
| 106 | engine = cls.from_uri(db_uri) |
| 107 | engine.engine.connect() |
| 108 | DBConnections.add(database_info.id, engine) |
| 109 | except Exception as e: |
| 110 | raise InvalidDBConnectionError( # noqa: B904 |
| 111 | f"Unable to connect to db: {database_info.alias}", description=str(e) |
| 112 | ) |
| 113 | return engine |
| 114 | |
| 115 | @classmethod |
| 116 | def extract_parameters(cls, input_string): |
no test coverage detected