Engine-agnostic packet processor. Mutates memory.* state in-place.
(pkt: NormalizedPacket, dns_candidates: dict)
| 27 | |
| 28 | |
| 29 | def _process_packet(pkt: NormalizedPacket, dns_candidates: dict) -> None: |
| 30 | """Engine-agnostic packet processor. Mutates memory.* state in-place.""" |
| 31 | try: |
| 32 | private_source = ipaddress.ip_address(pkt.src_ip).is_private |
| 33 | except Exception: |
| 34 | private_source = None |
| 35 | try: |
| 36 | private_destination = ipaddress.ip_address(pkt.dst_ip).is_private |
| 37 | except Exception: |
| 38 | private_destination = None |
| 39 | |
| 40 | session_key: str | None = None |
| 41 | |
| 42 | if pkt.proto in ("TCP", "UDP"): |
| 43 | src_p = str(pkt.src_port or 0) |
| 44 | dst_p = str(pkt.dst_port or 0) |
| 45 | |
| 46 | if private_source and private_destination: |
| 47 | key1 = f"{pkt.src_ip}/{pkt.dst_ip}/{dst_p}" |
| 48 | key2 = f"{pkt.dst_ip}/{pkt.src_ip}/{src_p}" |
| 49 | session_key = key2 if key2 in memory.packet_db else key1 |
| 50 | if pkt.src_mac: |
| 51 | if pkt.src_mac not in memory.lan_hosts: |
| 52 | memory.lan_hosts[pkt.src_mac] = LanHost(ip=pkt.src_ip) |
| 53 | if pkt.dst_mac not in memory.lan_hosts: |
| 54 | memory.lan_hosts[pkt.dst_mac] = LanHost(ip=pkt.dst_ip) |
| 55 | |
| 56 | elif private_source: |
| 57 | session_key = f"{pkt.src_ip}/{pkt.dst_ip}/{dst_p}" |
| 58 | if pkt.src_mac: |
| 59 | if pkt.src_mac not in memory.lan_hosts: |
| 60 | memory.lan_hosts[pkt.src_mac] = LanHost(ip=pkt.src_ip) |
| 61 | if pkt.dst_ip not in memory.destination_hosts: |
| 62 | memory.destination_hosts[pkt.dst_ip] = DestinationHost(mac=pkt.dst_mac) |
| 63 | |
| 64 | elif private_destination: |
| 65 | session_key = f"{pkt.dst_ip}/{pkt.src_ip}/{src_p}" |
| 66 | if pkt.dst_mac: |
| 67 | if pkt.dst_mac not in memory.lan_hosts: |
| 68 | memory.lan_hosts[pkt.dst_mac] = LanHost(ip=pkt.dst_ip) |
| 69 | if pkt.src_ip not in memory.destination_hosts: |
| 70 | memory.destination_hosts[pkt.src_ip] = DestinationHost(mac=pkt.src_mac) |
| 71 | |
| 72 | else: # both public |
| 73 | key1 = f"{pkt.src_ip}/{pkt.dst_ip}/{dst_p}" |
| 74 | key2 = f"{pkt.dst_ip}/{pkt.src_ip}/{src_p}" |
| 75 | session_key = key2 if key2 in memory.packet_db else key1 |
| 76 | if pkt.src_mac: |
| 77 | if pkt.src_ip not in memory.destination_hosts: |
| 78 | memory.destination_hosts[pkt.src_ip] = DestinationHost(mac=pkt.src_mac) |
| 79 | if pkt.dst_ip not in memory.destination_hosts: |
| 80 | memory.destination_hosts[pkt.dst_ip] = DestinationHost(mac=pkt.dst_mac) |
| 81 | |
| 82 | elif pkt.proto == "ICMP": |
| 83 | key1 = f"{pkt.src_ip}/{pkt.dst_ip}/ICMP" |
| 84 | key2 = f"{pkt.dst_ip}/{pkt.src_ip}/ICMP" |
| 85 | session_key = key2 if key2 in memory.packet_db else key1 |
| 86 |
no test coverage detected