Yields packets from input pcap file or device. :param str input: device or pcap file path :param bool interface: Whether input is a device. :param str bpf: Optional bpf filter. :param int count: Optional max count of packets to read before exiting. :yields: packets defined
(input: str, interface=False, bpf=None, count=None)
| 388 | |
| 389 | |
| 390 | def read_packets(input: str, interface=False, bpf=None, count=None) -> Iterable[dshell.Packet]: |
| 391 | """ |
| 392 | Yields packets from input pcap file or device. |
| 393 | |
| 394 | :param str input: device or pcap file path |
| 395 | :param bool interface: Whether input is a device. |
| 396 | :param str bpf: Optional bpf filter. |
| 397 | :param int count: Optional max count of packets to read before exiting. |
| 398 | |
| 399 | :yields: packets defined by pypacker. |
| 400 | NOTE: Timestamp and frame id are added to packet for convenience. |
| 401 | """ |
| 402 | |
| 403 | if interface: |
| 404 | # Listen on an interface if the option is set |
| 405 | try: |
| 406 | capture = pcapy.open_live(input, 65536, True, 1) |
| 407 | except pcapy.PcapError as e: |
| 408 | # User probably doesn't have permission to listen on interface |
| 409 | # In any case, print just the error without traceback |
| 410 | logger.error(str(e)) |
| 411 | return |
| 412 | else: |
| 413 | # Otherwise, read from pcap file(s) |
| 414 | try: |
| 415 | capture = pcapy.open_offline(input) |
| 416 | except pcapy.PcapError as e: |
| 417 | logger.error("Could not open '{}': {!s}".format(input, e)) |
| 418 | return |
| 419 | |
| 420 | # TODO: We may want to allow all packets to go through and then allow the plugin to filter |
| 421 | # them out in feed_plugin_chain(). |
| 422 | # That way our frame_id won't be out of sync from skipped packets. |
| 423 | # Try and use the first plugin's BPF as the initial filter |
| 424 | # The BPFs for other plugins will be applied along the chain as needed |
| 425 | try: |
| 426 | if bpf: |
| 427 | capture.setfilter(bpf) |
| 428 | except pcapy.PcapError as e: |
| 429 | if str(e).startswith("no VLAN support for data link type"): |
| 430 | logger.error("Cannot use VLAN filters for {!r}. Recommend running with --no-vlan argument.".format(input)) |
| 431 | return |
| 432 | elif "syntax error" in str(e) or "link layer applied in wrong context" == str(e): |
| 433 | logger.error("Could not compile BPF: {!s} ({!r})".format(e, bpf)) |
| 434 | return |
| 435 | elif "802.11 link-layer types supported only on 802.11" == str(e): |
| 436 | logger.error("BPF incompatible with pcap file: {!s}".format(e)) |
| 437 | return |
| 438 | else: |
| 439 | raise e |
| 440 | |
| 441 | # Set the datalink layer for each plugin, based on the pcapy capture. |
| 442 | # Also compile a pcapy BPF object for each. |
| 443 | datalink = capture.datalink() |
| 444 | for plugin in plugin_chain: |
| 445 | # TODO Find way around libpcap bug that segfaults when certain BPFs |
| 446 | # are used with certain datalink types |
| 447 | # (e.g. datalink=204, bpf="ip") |
no test coverage detected