(self)
| 240 | pass |
| 241 | |
| 242 | def sniff(self): |
| 243 | # type: () -> None |
| 244 | from scapy.supersocket import StreamSocket |
| 245 | ssock = socket.socket(socket.AF_INET, self.TYPE) |
| 246 | try: |
| 247 | ssock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
| 248 | except OSError: |
| 249 | pass |
| 250 | ssock.bind( |
| 251 | (get_if_addr(self.optsniff.get("iface", conf.iface)), self.port)) |
| 252 | ssock.listen() |
| 253 | sniffers = [] |
| 254 | try: |
| 255 | while True: |
| 256 | clientsocket, address = ssock.accept() |
| 257 | print("%s connected" % repr(address)) |
| 258 | sock = StreamSocket(clientsocket, self.cls) |
| 259 | optsniff = self.optsniff.copy() |
| 260 | optsniff["prn"] = functools.partial(self.reply, |
| 261 | send_function=sock.send, |
| 262 | address=address) |
| 263 | del optsniff["iface"] |
| 264 | sniffer = AsyncSniffer(opened_socket=sock, **optsniff) |
| 265 | sniffer.start() |
| 266 | sniffers.append((sniffer, sock)) |
| 267 | finally: |
| 268 | for (sniffer, sock) in sniffers: |
| 269 | try: |
| 270 | sniffer.stop() |
| 271 | except Exception: |
| 272 | pass |
| 273 | sock.close() |
| 274 | self.close() |
| 275 | ssock.close() |
| 276 | |
| 277 | def sniff_bg(self): |
| 278 | # type: () -> None |
nothing calls this directly
no test coverage detected