(self)
| 900 | break |
| 901 | |
| 902 | def _request_authentication(self): |
| 903 | # https://dev.mysql.com/doc/internals/en/connection-phase-packets.html#packet-Protocol::HandshakeResponse |
| 904 | if int(self.server_version.split(".", 1)[0]) >= 5: |
| 905 | self.client_flag |= CLIENT.MULTI_RESULTS |
| 906 | |
| 907 | if self.user is None: |
| 908 | raise ValueError("Did not specify a username") |
| 909 | |
| 910 | charset_id = charset_by_name(self.charset).id |
| 911 | if isinstance(self.user, str): |
| 912 | self.user = self.user.encode(self.encoding) |
| 913 | |
| 914 | # Determine flags for the initial handshake packet. |
| 915 | # CLIENT.SSL is added conditionally: for REQUIRED mode it is already set in |
| 916 | # self.client_flag, but for PREFERRED mode it is only added when the server |
| 917 | # also advertises SSL support. |
| 918 | # _do_ssl is set here and checked below for sha256_password auth. |
| 919 | client_flags = self.client_flag |
| 920 | if self.ssl: |
| 921 | if self.server_capabilities & CLIENT.SSL: |
| 922 | # SSL upgrade: include CLIENT.SSL flag and wrap the socket. |
| 923 | _do_ssl = True |
| 924 | client_flags |= CLIENT.SSL |
| 925 | elif self._ssl_required: |
| 926 | raise err.OperationalError( |
| 927 | CR.CR_SSL_CONNECTION_ERROR, |
| 928 | "SSL is required but the server doesn't support it", |
| 929 | ) |
| 930 | else: |
| 931 | # PREFERRED mode: server doesn't support SSL, fall back to non-SSL. |
| 932 | _do_ssl = False |
| 933 | else: |
| 934 | _do_ssl = False |
| 935 | |
| 936 | data_init = struct.pack( |
| 937 | "<iIB23s", client_flags, MAX_PACKET_LEN, charset_id, b"" |
| 938 | ) |
| 939 | |
| 940 | if _do_ssl: |
| 941 | self.write_packet(data_init) |
| 942 | self._sock = self.ctx.wrap_socket(self._sock, server_hostname=self.host) |
| 943 | self._rfile = self._sock.makefile("rb") |
| 944 | self._secure = True |
| 945 | |
| 946 | data = data_init + self.user + b"\0" |
| 947 | |
| 948 | authresp = b"" |
| 949 | plugin_name = None |
| 950 | |
| 951 | if self._auth_plugin_name == "": |
| 952 | plugin_name = b"" |
| 953 | authresp = _auth.scramble_native_password(self.password, self.salt) |
| 954 | elif self._auth_plugin_name == "mysql_native_password": |
| 955 | plugin_name = b"mysql_native_password" |
| 956 | authresp = _auth.scramble_native_password(self.password, self.salt) |
| 957 | elif self._auth_plugin_name == "caching_sha2_password": |
| 958 | plugin_name = b"caching_sha2_password" |
| 959 | if self.password: |
no test coverage detected