MCPcopy
hub / github.com/srixivas/PcapXray / _process_packet

Function _process_packet

Source/Module/pcap_reader.py:29–133  ·  view source on GitHub ↗

Engine-agnostic packet processor. Mutates memory.* state in-place.

(pkt: NormalizedPacket, dns_candidates: dict)

Source from the content-addressed store, hash-verified

27
28
29def _process_packet(pkt: NormalizedPacket, dns_candidates: dict) -> None:
30 """Engine-agnostic packet processor. Mutates memory.* state in-place."""
31 try:
32 private_source = ipaddress.ip_address(pkt.src_ip).is_private
33 except Exception:
34 private_source = None
35 try:
36 private_destination = ipaddress.ip_address(pkt.dst_ip).is_private
37 except Exception:
38 private_destination = None
39
40 session_key: str | None = None
41
42 if pkt.proto in ("TCP", "UDP"):
43 src_p = str(pkt.src_port or 0)
44 dst_p = str(pkt.dst_port or 0)
45
46 if private_source and private_destination:
47 key1 = f"{pkt.src_ip}/{pkt.dst_ip}/{dst_p}"
48 key2 = f"{pkt.dst_ip}/{pkt.src_ip}/{src_p}"
49 session_key = key2 if key2 in memory.packet_db else key1
50 if pkt.src_mac:
51 if pkt.src_mac not in memory.lan_hosts:
52 memory.lan_hosts[pkt.src_mac] = LanHost(ip=pkt.src_ip)
53 if pkt.dst_mac not in memory.lan_hosts:
54 memory.lan_hosts[pkt.dst_mac] = LanHost(ip=pkt.dst_ip)
55
56 elif private_source:
57 session_key = f"{pkt.src_ip}/{pkt.dst_ip}/{dst_p}"
58 if pkt.src_mac:
59 if pkt.src_mac not in memory.lan_hosts:
60 memory.lan_hosts[pkt.src_mac] = LanHost(ip=pkt.src_ip)
61 if pkt.dst_ip not in memory.destination_hosts:
62 memory.destination_hosts[pkt.dst_ip] = DestinationHost(mac=pkt.dst_mac)
63
64 elif private_destination:
65 session_key = f"{pkt.dst_ip}/{pkt.src_ip}/{src_p}"
66 if pkt.dst_mac:
67 if pkt.dst_mac not in memory.lan_hosts:
68 memory.lan_hosts[pkt.dst_mac] = LanHost(ip=pkt.dst_ip)
69 if pkt.src_ip not in memory.destination_hosts:
70 memory.destination_hosts[pkt.src_ip] = DestinationHost(mac=pkt.src_mac)
71
72 else: # both public
73 key1 = f"{pkt.src_ip}/{pkt.dst_ip}/{dst_p}"
74 key2 = f"{pkt.dst_ip}/{pkt.src_ip}/{src_p}"
75 session_key = key2 if key2 in memory.packet_db else key1
76 if pkt.src_mac:
77 if pkt.src_ip not in memory.destination_hosts:
78 memory.destination_hosts[pkt.src_ip] = DestinationHost(mac=pkt.src_mac)
79 if pkt.dst_ip not in memory.destination_hosts:
80 memory.destination_hosts[pkt.dst_ip] = DestinationHost(mac=pkt.dst_mac)
81
82 elif pkt.proto == "ICMP":
83 key1 = f"{pkt.src_ip}/{pkt.dst_ip}/ICMP"
84 key2 = f"{pkt.dst_ip}/{pkt.src_ip}/ICMP"
85 session_key = key2 if key2 in memory.packet_db else key1
86

Callers 2

__init__Method · 0.85
_on_packetMethod · 0.85

Calls 5

LanHostClass · 0.90
DestinationHostClass · 0.90
PacketSessionClass · 0.90
is_multicastMethod · 0.80

Tested by

no test coverage detected