Update command executor following directConnect feature
(self, keep_alive: bool)
| 297 | delattr(WebDriver, method_name) |
| 298 | |
| 299 | def _update_command_executor(self, keep_alive: bool) -> None: |
| 300 | """Update command executor following directConnect feature""" |
| 301 | direct_protocol = 'directConnectProtocol' |
| 302 | direct_host = 'directConnectHost' |
| 303 | direct_port = 'directConnectPort' |
| 304 | direct_path = 'directConnectPath' |
| 305 | |
| 306 | if not self.caps: |
| 307 | raise ValueError('Driver capabilities must be defined') |
| 308 | if not {direct_protocol, direct_host, direct_port, direct_path}.issubset(set(self.caps)): |
| 309 | message = 'Direct connect capabilities from server were:\n' |
| 310 | for key in [direct_protocol, direct_host, direct_port, direct_path]: |
| 311 | message += f"{key}: '{self.caps.get(key, '')}' " |
| 312 | logger.debug(message) |
| 313 | return |
| 314 | |
| 315 | protocol = self.caps[direct_protocol] |
| 316 | hostname = self.caps[direct_host] |
| 317 | port = self.caps[direct_port] |
| 318 | path = self.caps[direct_path] |
| 319 | executor = f'{protocol}://{hostname}:{port}{path}' |
| 320 | |
| 321 | logger.debug('Updated request endpoint to %s', executor) |
| 322 | # Override command executor. |
| 323 | if isinstance(self.command_executor, AppiumConnection): # type: ignore |
| 324 | self.command_executor = AppiumConnection(executor, keep_alive=keep_alive) |
| 325 | else: |
| 326 | self.command_executor = RemoteConnection(executor, keep_alive=keep_alive) |
| 327 | self._add_commands() |
| 328 | |
| 329 | # https://github.com/SeleniumHQ/selenium/blob/06fdf2966df6bca47c0ae45e8201cd30db9b9a49/py/selenium/webdriver/remote/webdriver.py#L277 |
| 330 | # noinspection PyAttributeOutsideInit |
no test coverage detected