(
url_or_config: Union[str, "URL", dict[str, str]], **engine_kwargs: Any
)
| 119 | |
| 120 | @contextmanager |
| 121 | def client( |
| 122 | url_or_config: Union[str, "URL", dict[str, str]], **engine_kwargs: Any |
| 123 | ) -> Iterator[Client]: |
| 124 | url = url_from_config(url_or_config) |
| 125 | echo = env2bool(env.DVC_SQLALCHEMY_ECHO, False) |
| 126 | engine_kwargs.setdefault("echo", echo) |
| 127 | |
| 128 | with handle_error(url): |
| 129 | engine = create_engine(url, **engine_kwargs) |
| 130 | |
| 131 | try: |
| 132 | yield Client(engine) |
| 133 | finally: |
| 134 | engine.dispose() |
no test coverage detected