A stateful pcap reader. Each packet is returned as a string
| 1401 | |
| 1402 | |
| 1403 | class RawPcapReader(metaclass=PcapReader_metaclass): |
| 1404 | """A stateful pcap reader. Each packet is returned as a string""" |
| 1405 | |
| 1406 | # TODO: use Generics to properly type the various readers. |
| 1407 | # As of right now, RawPcapReader is typed as if it returned packets |
| 1408 | # because all of its child do. Fix that |
| 1409 | |
| 1410 | nonblocking_socket = True |
| 1411 | PacketMetadata = collections.namedtuple("PacketMetadata", |
| 1412 | ["sec", "usec", "wirelen", "caplen"]) # noqa: E501 |
| 1413 | |
| 1414 | def __init__(self, filename, fdesc=None, magic=None): # type: ignore |
| 1415 | # type: (str, _ByteStream, bytes) -> None |
| 1416 | self.filename = filename |
| 1417 | self.f = fdesc |
| 1418 | if magic == b"\xa1\xb2\xc3\xd4": # big endian |
| 1419 | self.endian = ">" |
| 1420 | self.nano = False |
| 1421 | elif magic == b"\xd4\xc3\xb2\xa1": # little endian |
| 1422 | self.endian = "<" |
| 1423 | self.nano = False |
| 1424 | elif magic == b"\xa1\xb2\x3c\x4d": # big endian, nanosecond-precision |
| 1425 | self.endian = ">" |
| 1426 | self.nano = True |
| 1427 | elif magic == b"\x4d\x3c\xb2\xa1": # little endian, nanosecond-precision # noqa: E501 |
| 1428 | self.endian = "<" |
| 1429 | self.nano = True |
| 1430 | else: |
| 1431 | raise Scapy_Exception( |
| 1432 | "Not a pcap capture file (bad magic: %r)" % magic |
| 1433 | ) |
| 1434 | hdr = self.f.read(20) |
| 1435 | if len(hdr) < 20: |
| 1436 | raise Scapy_Exception("Invalid pcap file (too short)") |
| 1437 | vermaj, vermin, tz, sig, snaplen, linktype = struct.unpack( |
| 1438 | self.endian + "HHIIII", hdr |
| 1439 | ) |
| 1440 | self.linktype = linktype |
| 1441 | self.snaplen = snaplen |
| 1442 | |
| 1443 | def __enter__(self): |
| 1444 | # type: () -> RawPcapReader |
| 1445 | return self |
| 1446 | |
| 1447 | def __iter__(self): |
| 1448 | # type: () -> RawPcapReader |
| 1449 | return self |
| 1450 | |
| 1451 | def __next__(self): |
| 1452 | # type: () -> Tuple[bytes, RawPcapReader.PacketMetadata] |
| 1453 | """ |
| 1454 | implement the iterator protocol on a set of packets in a pcap file |
| 1455 | """ |
| 1456 | try: |
| 1457 | return self._read_packet() |
| 1458 | except EOFError: |
| 1459 | raise StopIteration |
| 1460 |