(self, pcap_file_name: str, pcap_parser_engine: str = "auto")
| 174 | """ |
| 175 | |
| 176 | def __init__(self, pcap_file_name: str, pcap_parser_engine: str = "auto") -> None: |
| 177 | _reset_memory() |
| 178 | self.engine = pcap_parser_engine |
| 179 | self._dns_candidates: dict[str, str] = {} |
| 180 | |
| 181 | try: |
| 182 | engine = select_engine(pcap_parser_engine, pcap_file_name) |
| 183 | except (ImportError, ValueError) as exc: |
| 184 | log.error("Cannot create engine %r: %s", pcap_parser_engine, exc) |
| 185 | sys.exit(1) |
| 186 | |
| 187 | log.info("Reading PCAP: %s (engine=%s)", pcap_file_name, pcap_parser_engine) |
| 188 | for pkt in engine.stream(): |
| 189 | try: |
| 190 | _process_packet(pkt, self._dns_candidates) |
| 191 | except Exception as exc: |
| 192 | log.debug("PcapEngine: skipping packet: %s", exc) |
| 193 | |
| 194 | _run_deferred_covert(self._dns_candidates) |
| 195 | log.info( |
| 196 | "PCAP analysis complete: %d sessions, %d LAN hosts, %d dest hosts", |
| 197 | len(memory.packet_db), len(memory.lan_hosts), len(memory.destination_hosts), |
| 198 | ) |
| 199 | |
| 200 | |
| 201 | class LivePcapEngine: |
nothing calls this directly
no test coverage detected