MCPcopy Index your code
hub / github.com/secdev/scapy / _session_extractor

Method _session_extractor

scapy/plist.py:621–648  ·  view source on GitHub ↗

Extract sessions from packets

(p)

Source from the content-addressed store, hash-verified

619 # type: (...) -> Dict[str, _PacketList[_Inner]]
620 if session_extractor is None:
621 def _session_extractor(p):
622 # type: (Packet) -> str
623 """Extract sessions from packets"""
624 if 'Ether' in p:
625 if 'IP' in p or 'IPv6' in p:
626 ip_src_fmt = "{IP:%IP.src%}{IPv6:%IPv6.src%}"
627 ip_dst_fmt = "{IP:%IP.dst%}{IPv6:%IPv6.dst%}"
628 addr_fmt = (ip_src_fmt, ip_dst_fmt)
629 if 'TCP' in p:
630 fmt = "TCP {}:%r,TCP.sport% > {}:%r,TCP.dport%"
631 elif 'UDP' in p:
632 fmt = "UDP {}:%r,UDP.sport% > {}:%r,UDP.dport%"
633 elif 'ICMP' in p:
634 fmt = "ICMP {} > {} type=%r,ICMP.type% code=%r," \
635 "ICMP.code% id=%ICMP.id%"
636 elif 'ICMPv6' in p:
637 fmt = "ICMPv6 {} > {} type=%r,ICMPv6.type% " \
638 "code=%r,ICMPv6.code%"
639 elif 'IPv6' in p:
640 fmt = "IPv6 {} > {} nh=%IPv6.nh%"
641 else:
642 fmt = "IP {} > {} proto=%IP.proto%"
643 return p.sprintf(fmt.format(*addr_fmt))
644 elif 'ARP' in p:
645 return p.sprintf("ARP %ARP.psrc% > %ARP.pdst%")
646 else:
647 return p.sprintf("Ethernet type=%04xr,Ether.type%")
648 return "Other"
649 session_extractor = _session_extractor
650 sessions = defaultdict(self.__class__) # type: DefaultDict[str, _PacketList[_Inner]] # noqa: E501
651 for p in self.res:

Callers

nothing calls this directly

Calls 2

sprintfMethod · 0.45
formatMethod · 0.45

Tested by

no test coverage detected