(self, size=MTU, **kwargs)
| 2051 | return self |
| 2052 | |
| 2053 | def read_packet(self, size=MTU, **kwargs): |
| 2054 | # type: (int, **Any) -> Packet |
| 2055 | rp = super(PcapNgReader, self)._read_packet(size=size) |
| 2056 | if rp is None: |
| 2057 | raise EOFError |
| 2058 | s, (linktype, tsresol, tshigh, tslow, wirelen, comments, ifname, direction, process_information) = rp # noqa: E501 |
| 2059 | try: |
| 2060 | cls = conf.l2types.num2layer[linktype] # type: Type[Packet] |
| 2061 | p = cls(s, **kwargs) # type: Packet |
| 2062 | except KeyboardInterrupt: |
| 2063 | raise |
| 2064 | except Exception: |
| 2065 | if conf.debug_dissector: |
| 2066 | raise |
| 2067 | if conf.raw_layer is None: |
| 2068 | # conf.raw_layer is set on import |
| 2069 | import scapy.packet # noqa: F401 |
| 2070 | p = conf.raw_layer(s) |
| 2071 | if tshigh is not None: |
| 2072 | p.time = EDecimal((tshigh << 32) + tslow) / tsresol |
| 2073 | p.wirelen = wirelen |
| 2074 | p.comments = comments |
| 2075 | p.direction = direction |
| 2076 | p.process_information = process_information.copy() |
| 2077 | if ifname is not None: |
| 2078 | p.sniffed_on = ifname.decode('utf-8', 'backslashreplace') |
| 2079 | return p |
| 2080 | |
| 2081 | def recv(self, size: int = MTU, **kwargs: Any) -> 'Packet': # type: ignore |
| 2082 | return self.read_packet(size=size, **kwargs) |
no test coverage detected