| 596 | conn.set_character_set = original_set_charset # type: ignore[assignment] |
| 597 | |
| 598 | def _create_ssl_ctx(self, sslp: dict) -> ssl.SSLContext: |
| 599 | ca = sslp.get("ca") |
| 600 | capath = sslp.get("capath") |
| 601 | hasnoca = ca is None and capath is None |
| 602 | ctx = ssl.create_default_context(cafile=ca, capath=capath) |
| 603 | ctx.check_hostname = not hasnoca and sslp.get("check_hostname", True) |
| 604 | ctx.verify_mode = ssl.CERT_NONE if hasnoca else ssl.CERT_REQUIRED |
| 605 | if "cert" in sslp: |
| 606 | ctx.load_cert_chain(sslp["cert"], keyfile=sslp.get("key")) |
| 607 | if "cipher" in sslp: |
| 608 | ctx.set_ciphers(sslp["cipher"]) |
| 609 | |
| 610 | ctx.minimum_version = ssl.TLSVersion.TLSv1_2 |
| 611 | |
| 612 | if "tls_version" in sslp: |
| 613 | tls_version = sslp["tls_version"] |
| 614 | |
| 615 | if tls_version == "TLSv1": |
| 616 | ctx.minimum_version = ssl.TLSVersion.TLSv1 |
| 617 | ctx.maximum_version = ssl.TLSVersion.TLSv1 |
| 618 | elif tls_version == "TLSv1.1": |
| 619 | ctx.minimum_version = ssl.TLSVersion.TLSv1_1 |
| 620 | ctx.maximum_version = ssl.TLSVersion.TLSv1_1 |
| 621 | elif tls_version == "TLSv1.2": |
| 622 | ctx.minimum_version = ssl.TLSVersion.TLSv1_2 |
| 623 | ctx.maximum_version = ssl.TLSVersion.TLSv1_2 |
| 624 | elif tls_version == "TLSv1.3": |
| 625 | ctx.minimum_version = ssl.TLSVersion.TLSv1_3 |
| 626 | ctx.maximum_version = ssl.TLSVersion.TLSv1_3 |
| 627 | else: |
| 628 | _logger.error("Invalid tls version: %s", tls_version) |
| 629 | |
| 630 | return ctx |
| 631 | |
| 632 | def close(self) -> None: |
| 633 | if self.conn is not None: |