Extract sessions from packets
(p)
| 619 | # type: (...) -> Dict[str, _PacketList[_Inner]] |
| 620 | if session_extractor is None: |
| 621 | def _session_extractor(p): |
| 622 | # type: (Packet) -> str |
| 623 | """Extract sessions from packets""" |
| 624 | if 'Ether' in p: |
| 625 | if 'IP' in p or 'IPv6' in p: |
| 626 | ip_src_fmt = "{IP:%IP.src%}{IPv6:%IPv6.src%}" |
| 627 | ip_dst_fmt = "{IP:%IP.dst%}{IPv6:%IPv6.dst%}" |
| 628 | addr_fmt = (ip_src_fmt, ip_dst_fmt) |
| 629 | if 'TCP' in p: |
| 630 | fmt = "TCP {}:%r,TCP.sport% > {}:%r,TCP.dport%" |
| 631 | elif 'UDP' in p: |
| 632 | fmt = "UDP {}:%r,UDP.sport% > {}:%r,UDP.dport%" |
| 633 | elif 'ICMP' in p: |
| 634 | fmt = "ICMP {} > {} type=%r,ICMP.type% code=%r," \ |
| 635 | "ICMP.code% id=%ICMP.id%" |
| 636 | elif 'ICMPv6' in p: |
| 637 | fmt = "ICMPv6 {} > {} type=%r,ICMPv6.type% " \ |
| 638 | "code=%r,ICMPv6.code%" |
| 639 | elif 'IPv6' in p: |
| 640 | fmt = "IPv6 {} > {} nh=%IPv6.nh%" |
| 641 | else: |
| 642 | fmt = "IP {} > {} proto=%IP.proto%" |
| 643 | return p.sprintf(fmt.format(*addr_fmt)) |
| 644 | elif 'ARP' in p: |
| 645 | return p.sprintf("ARP %ARP.psrc% > %ARP.pdst%") |
| 646 | else: |
| 647 | return p.sprintf("Ethernet type=%04xr,Ether.type%") |
| 648 | return "Other" |
| 649 | session_extractor = _session_extractor |
| 650 | sessions = defaultdict(self.__class__) # type: DefaultDict[str, _PacketList[_Inner]] # noqa: E501 |
| 651 | for p in self.res: |