(*, dsn, host, port, user,
password, passfile, database, ssl,
service, servicefile,
direct_tls, server_settings,
target_session_attrs, krbsrvname, gsslib)
| 274 | |
| 275 | |
| 276 | def _parse_connect_dsn_and_args(*, dsn, host, port, user, |
| 277 | password, passfile, database, ssl, |
| 278 | service, servicefile, |
| 279 | direct_tls, server_settings, |
| 280 | target_session_attrs, krbsrvname, gsslib): |
| 281 | # `auth_hosts` is the version of host information for the purposes |
| 282 | # of reading the pgpass file. |
| 283 | auth_hosts = None |
| 284 | sslcert = sslkey = sslrootcert = sslcrl = sslpassword = None |
| 285 | ssl_min_protocol_version = ssl_max_protocol_version = None |
| 286 | sslnegotiation = None |
| 287 | |
| 288 | if dsn: |
| 289 | parsed = urllib.parse.urlparse(dsn) |
| 290 | |
| 291 | query = None |
| 292 | if parsed.query: |
| 293 | query = urllib.parse.parse_qs(parsed.query, strict_parsing=True) |
| 294 | for key, val in query.items(): |
| 295 | if isinstance(val, list): |
| 296 | query[key] = val[-1] |
| 297 | |
| 298 | if 'service' in query: |
| 299 | val = query.pop('service') |
| 300 | if not service and val: |
| 301 | service = val |
| 302 | |
| 303 | connection_service_file = servicefile |
| 304 | |
| 305 | if connection_service_file is None: |
| 306 | connection_service_file = os.getenv('PGSERVICEFILE') |
| 307 | |
| 308 | if connection_service_file is None: |
| 309 | homedir = compat.get_pg_home_directory() |
| 310 | if homedir: |
| 311 | connection_service_file = homedir / PG_SERVICEFILE |
| 312 | else: |
| 313 | connection_service_file = None |
| 314 | else: |
| 315 | connection_service_file = pathlib.Path(connection_service_file) |
| 316 | |
| 317 | if parsed.scheme not in {'postgresql', 'postgres'}: |
| 318 | raise exceptions.ClientConfigurationError( |
| 319 | 'invalid DSN: scheme is expected to be either ' |
| 320 | '"postgresql" or "postgres", got {!r}'.format(parsed.scheme)) |
| 321 | |
| 322 | if parsed.netloc: |
| 323 | if '@' in parsed.netloc: |
| 324 | dsn_auth, _, dsn_hostspec = parsed.netloc.partition('@') |
| 325 | else: |
| 326 | dsn_hostspec = parsed.netloc |
| 327 | dsn_auth = '' |
| 328 | else: |
| 329 | dsn_auth = dsn_hostspec = '' |
| 330 | |
| 331 | if dsn_auth: |
| 332 | dsn_user, _, dsn_password = dsn_auth.partition(':') |
| 333 | else: |
no test coverage detected
searching dependent graphs…