| 193 | |
| 194 | |
| 195 | class TcpConnectionTable(collections.abc.Mapping): |
| 196 | DEFAULT_TABLE_SIZE = 4096 |
| 197 | |
| 198 | def __init__(self): |
| 199 | self._tcp = MIB_TCPTABLE_OWNER_PID(self.DEFAULT_TABLE_SIZE) |
| 200 | self._tcp_size = ctypes.wintypes.DWORD(self.DEFAULT_TABLE_SIZE) |
| 201 | self._tcp6 = MIB_TCP6TABLE_OWNER_PID(self.DEFAULT_TABLE_SIZE) |
| 202 | self._tcp6_size = ctypes.wintypes.DWORD(self.DEFAULT_TABLE_SIZE) |
| 203 | self._map = {} |
| 204 | |
| 205 | def __getitem__(self, item): |
| 206 | return self._map[item] |
| 207 | |
| 208 | def __iter__(self): |
| 209 | return self._map.__iter__() |
| 210 | |
| 211 | def __len__(self): |
| 212 | return self._map.__len__() |
| 213 | |
| 214 | def refresh(self): |
| 215 | self._map = {} |
| 216 | self._refresh_ipv4() |
| 217 | self._refresh_ipv6() |
| 218 | |
| 219 | def _refresh_ipv4(self): |
| 220 | ret = ctypes.windll.iphlpapi.GetExtendedTcpTable( # type: ignore |
| 221 | ctypes.byref(self._tcp), |
| 222 | ctypes.byref(self._tcp_size), |
| 223 | False, |
| 224 | socket.AF_INET, |
| 225 | TCP_TABLE_OWNER_PID_CONNECTIONS, |
| 226 | 0, |
| 227 | ) |
| 228 | if ret == 0: |
| 229 | for row in self._tcp.table[: self._tcp.dwNumEntries]: |
| 230 | local_ip = socket.inet_ntop(socket.AF_INET, bytes(row.ucLocalAddr)) |
| 231 | local_port = socket.htons(row.dwLocalPort) |
| 232 | self._map[(local_ip, local_port)] = row.dwOwningPid |
| 233 | elif ret == ERROR_INSUFFICIENT_BUFFER: |
| 234 | self._tcp = MIB_TCPTABLE_OWNER_PID(self._tcp_size.value) |
| 235 | # no need to update size, that's already done. |
| 236 | self._refresh_ipv4() |
| 237 | else: |
| 238 | raise RuntimeError( |
| 239 | "[IPv4] Unknown GetExtendedTcpTable return code: %s" % ret |
| 240 | ) |
| 241 | |
| 242 | def _refresh_ipv6(self): |
| 243 | ret = ctypes.windll.iphlpapi.GetExtendedTcpTable( # type: ignore |
| 244 | ctypes.byref(self._tcp6), |
| 245 | ctypes.byref(self._tcp6_size), |
| 246 | False, |
| 247 | socket.AF_INET6, |
| 248 | TCP_TABLE_OWNER_PID_CONNECTIONS, |
| 249 | 0, |
| 250 | ) |
| 251 | if ret == 0: |
| 252 | for row in self._tcp6.table[: self._tcp6.dwNumEntries]: |
no outgoing calls
no test coverage detected
searching dependent graphs…