MCPcopy
hub / github.com/MagicStack/asyncpg / run_testcase

Method run_testcase

tests/test_connect.py:1131–1211  ·  view source on GitHub ↗
(self, testcase)

Source from the content-addressed store, hash-verified

1129 os.environ[key] = val
1130
1131 def run_testcase(self, testcase):
1132 env = testcase.get('env', {})
1133 test_env = {'PGHOST': None, 'PGPORT': None,
1134 'PGUSER': None, 'PGPASSWORD': None,
1135 'PGDATABASE': None, 'PGSSLMODE': None,
1136 'PGSERVICE': None, }
1137 test_env.update(env)
1138
1139 dsn = testcase.get('dsn')
1140 user = testcase.get('user')
1141 port = testcase.get('port')
1142 host = testcase.get('host')
1143 password = testcase.get('password')
1144 passfile = testcase.get('passfile')
1145 database = testcase.get('database')
1146 sslmode = testcase.get('ssl')
1147 direct_tls = testcase.get('direct_tls')
1148 server_settings = testcase.get('server_settings')
1149 target_session_attrs = testcase.get('target_session_attrs')
1150 krbsrvname = testcase.get('krbsrvname')
1151 gsslib = testcase.get('gsslib')
1152 service = testcase.get('service')
1153 servicefile = testcase.get('servicefile')
1154
1155 expected = testcase.get('result')
1156 expected_error = testcase.get('error')
1157 if expected is None and expected_error is None:
1158 raise RuntimeError(
1159 'invalid test case: either "result" or "error" key '
1160 'has to be specified')
1161 if expected is not None and expected_error is not None:
1162 raise RuntimeError(
1163 'invalid test case: either "result" or "error" key '
1164 'has to be specified, got both')
1165
1166 with contextlib.ExitStack() as es:
1167 es.enter_context(self.subTest(dsn=dsn, env=env))
1168 es.enter_context(self.environ(**test_env))
1169
1170 if expected_error:
1171 es.enter_context(self.assertRaisesRegex(*expected_error))
1172
1173 addrs, params = connect_utils._parse_connect_dsn_and_args(
1174 dsn=dsn, host=host, port=port, user=user, password=password,
1175 passfile=passfile, database=database, ssl=sslmode,
1176 direct_tls=direct_tls,
1177 server_settings=server_settings,
1178 target_session_attrs=target_session_attrs,
1179 krbsrvname=krbsrvname, gsslib=gsslib,
1180 service=service, servicefile=servicefile)
1181
1182 params = {
1183 k: v for k, v in params._asdict().items()
1184 if v is not None or (expected is not None and k in expected[1])
1185 }
1186
1187 if isinstance(params.get('ssl'), ssl.SSLContext):
1188 params['ssl'] = True

Calls 2

environMethod · 0.95
getMethod · 0.80

Tested by

no test coverage detected