(self, csock: socket.socket)
| 69 | write(os.getpid(), self.wfile) |
| 70 | |
| 71 | def original_addr(self, csock: socket.socket): |
| 72 | ip, port = csock.getpeername()[:2] |
| 73 | ip = re.sub(r"^::ffff:(?=\d+.\d+.\d+.\d+$)", "", ip) |
| 74 | ip = ip.split("%", 1)[0] |
| 75 | with self.lock: |
| 76 | try: |
| 77 | write((ip, port), self.wfile) |
| 78 | addr = read(self.rfile) |
| 79 | if addr is None: |
| 80 | raise RuntimeError("Cannot resolve original destination.") |
| 81 | return tuple(addr) |
| 82 | except (EOFError, OSError, AttributeError): |
| 83 | self._connect() |
| 84 | return self.original_addr(csock) |
| 85 | |
| 86 | |
| 87 | class APIRequestHandler(socketserver.StreamRequestHandler): |
no test coverage detected