(
proxy: Proxy,
ws_uri: WebSocketURI,
deadline: Deadline,
*,
user_agent_header: str | None = None,
ssl: ssl_module.SSLContext | None = None,
server_hostname: str | None = None,
**kwargs: Any,
)
| 512 | |
| 513 | |
| 514 | def connect_http_proxy( |
| 515 | proxy: Proxy, |
| 516 | ws_uri: WebSocketURI, |
| 517 | deadline: Deadline, |
| 518 | *, |
| 519 | user_agent_header: str | None = None, |
| 520 | ssl: ssl_module.SSLContext | None = None, |
| 521 | server_hostname: str | None = None, |
| 522 | **kwargs: Any, |
| 523 | ) -> socket.socket: |
| 524 | # Connect socket |
| 525 | |
| 526 | kwargs.setdefault("timeout", deadline.timeout()) |
| 527 | sock = socket.create_connection((proxy.host, proxy.port), **kwargs) |
| 528 | |
| 529 | # Initialize TLS wrapper and perform TLS handshake |
| 530 | |
| 531 | if proxy.scheme == "https": |
| 532 | if ssl is None: |
| 533 | ssl = ssl_module.create_default_context() |
| 534 | if server_hostname is None: |
| 535 | server_hostname = proxy.host |
| 536 | sock.settimeout(deadline.timeout()) |
| 537 | sock = ssl.wrap_socket(sock, server_hostname=server_hostname) |
| 538 | sock.settimeout(None) |
| 539 | |
| 540 | # Send CONNECT request to the proxy and read response. |
| 541 | |
| 542 | sock.sendall(prepare_connect_request(proxy, ws_uri, user_agent_header)) |
| 543 | try: |
| 544 | read_connect_response(sock, deadline) |
| 545 | except Exception: |
| 546 | sock.close() |
| 547 | raise |
| 548 | |
| 549 | return sock |
| 550 | |
| 551 | |
| 552 | T = TypeVar("T") |
no test coverage detected
searching dependent graphs…