| 379 | self.close() |
| 380 | |
| 381 | def _create_ssl_ctx(self, sslp): |
| 382 | if isinstance(sslp, ssl.SSLContext): |
| 383 | return sslp |
| 384 | ca = sslp.get("ca") |
| 385 | capath = sslp.get("capath") |
| 386 | hasnoca = ca is None and capath is None |
| 387 | ctx = ssl.create_default_context(cafile=ca, capath=capath) |
| 388 | |
| 389 | # Python 3.13 enables VERIFY_X509_STRICT by default. |
| 390 | # But self signed certificates that are generated by MySQL automatically |
| 391 | # doesn't pass the verification. |
| 392 | ctx.verify_flags &= ~ssl.VERIFY_X509_STRICT |
| 393 | |
| 394 | ctx.check_hostname = not hasnoca and sslp.get("check_hostname", True) |
| 395 | verify_mode_value = sslp.get("verify_mode") |
| 396 | if verify_mode_value is None: |
| 397 | ctx.verify_mode = ssl.CERT_NONE if hasnoca else ssl.CERT_REQUIRED |
| 398 | elif isinstance(verify_mode_value, bool): |
| 399 | ctx.verify_mode = ssl.CERT_REQUIRED if verify_mode_value else ssl.CERT_NONE |
| 400 | else: |
| 401 | if isinstance(verify_mode_value, str): |
| 402 | verify_mode_value = verify_mode_value.lower() |
| 403 | if verify_mode_value in ("none", "0", "false", "no"): |
| 404 | ctx.verify_mode = ssl.CERT_NONE |
| 405 | elif verify_mode_value == "optional": |
| 406 | ctx.verify_mode = ssl.CERT_OPTIONAL |
| 407 | elif verify_mode_value in ("required", "1", "true", "yes"): |
| 408 | ctx.verify_mode = ssl.CERT_REQUIRED |
| 409 | else: |
| 410 | ctx.verify_mode = ssl.CERT_NONE if hasnoca else ssl.CERT_REQUIRED |
| 411 | if "cert" in sslp: |
| 412 | ctx.load_cert_chain( |
| 413 | sslp["cert"], keyfile=sslp.get("key"), password=sslp.get("password") |
| 414 | ) |
| 415 | if "cipher" in sslp: |
| 416 | ctx.set_ciphers(sslp["cipher"]) |
| 417 | ctx.options |= ssl.OP_NO_SSLv2 |
| 418 | ctx.options |= ssl.OP_NO_SSLv3 |
| 419 | return ctx |
| 420 | |
| 421 | def close(self): |
| 422 | """ |