(self)
| 240 | ) |
| 241 | |
| 242 | def _refresh_ipv6(self): |
| 243 | ret = ctypes.windll.iphlpapi.GetExtendedTcpTable( # type: ignore |
| 244 | ctypes.byref(self._tcp6), |
| 245 | ctypes.byref(self._tcp6_size), |
| 246 | False, |
| 247 | socket.AF_INET6, |
| 248 | TCP_TABLE_OWNER_PID_CONNECTIONS, |
| 249 | 0, |
| 250 | ) |
| 251 | if ret == 0: |
| 252 | for row in self._tcp6.table[: self._tcp6.dwNumEntries]: |
| 253 | local_ip = socket.inet_ntop(socket.AF_INET6, bytes(row.ucLocalAddr)) |
| 254 | local_port = socket.htons(row.dwLocalPort) |
| 255 | self._map[(local_ip, local_port)] = row.dwOwningPid |
| 256 | elif ret == ERROR_INSUFFICIENT_BUFFER: |
| 257 | self._tcp6 = MIB_TCP6TABLE_OWNER_PID(self._tcp6_size.value) |
| 258 | # no need to update size, that's already done. |
| 259 | self._refresh_ipv6() |
| 260 | else: |
| 261 | raise RuntimeError( |
| 262 | "[IPv6] Unknown GetExtendedTcpTable return code: %s" % ret |
| 263 | ) |
| 264 | |
| 265 | |
| 266 | class Redirect(threading.Thread): |
no test coverage detected