A basic HTTP client :param mech: one of HTTP_AUTH_MECHS :param ssl: whether to use HTTPS or not :param ssp: the SSP object to use for binding :param no_check_certificate: with SSL, do not check the certificate :param no_chan_bindings: force disable sending the channel bindi
| 756 | |
| 757 | |
| 758 | class HTTP_Client(object): |
| 759 | """ |
| 760 | A basic HTTP client |
| 761 | |
| 762 | :param mech: one of HTTP_AUTH_MECHS |
| 763 | :param ssl: whether to use HTTPS or not |
| 764 | :param ssp: the SSP object to use for binding |
| 765 | :param no_check_certificate: with SSL, do not check the certificate |
| 766 | :param no_chan_bindings: force disable sending the channel bindings |
| 767 | """ |
| 768 | |
| 769 | def __init__( |
| 770 | self, |
| 771 | mech=HTTP_AUTH_MECHS.NONE, |
| 772 | verb=True, |
| 773 | sslcontext=None, |
| 774 | ssp=None, |
| 775 | no_check_certificate=False, |
| 776 | no_chan_bindings=False, |
| 777 | ): |
| 778 | self.sock = None |
| 779 | self._sockinfo = None |
| 780 | self.authmethod = mech |
| 781 | self.verb = verb |
| 782 | self.sslcontext = sslcontext |
| 783 | self.ssp = ssp |
| 784 | self.sspcontext = None |
| 785 | self.no_check_certificate = no_check_certificate |
| 786 | self.no_chan_bindings = no_chan_bindings |
| 787 | self.chan_bindings = GSS_C_NO_CHANNEL_BINDINGS |
| 788 | |
| 789 | def _connect_or_reuse(self, host, port=None, tls=False, timeout=5): |
| 790 | # Get the port |
| 791 | if port is None: |
| 792 | port = 443 if tls else 80 |
| 793 | # If the current socket matches, keep it. |
| 794 | if self._sockinfo == (host, port): |
| 795 | return |
| 796 | # A new socket is needed |
| 797 | if self._sockinfo: |
| 798 | self.close() |
| 799 | sock = socket.socket() |
| 800 | sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) |
| 801 | sock.settimeout(timeout) |
| 802 | if self.verb: |
| 803 | print( |
| 804 | "\u2503 Connecting to %s on port %s%s..." |
| 805 | % ( |
| 806 | host, |
| 807 | port, |
| 808 | " with SSL" if tls else "", |
| 809 | ) |
| 810 | ) |
| 811 | sock.connect((host, port)) |
| 812 | if self.verb: |
| 813 | print( |
| 814 | conf.color_theme.green( |
| 815 | "\u2514 Connected from %s" % repr(sock.getsockname()) |
no outgoing calls
no test coverage detected