(self, pcap_name: str)
| 91 | log.error("SqliteStore.save_session: %s", exc) |
| 92 | |
| 93 | def load_session(self, pcap_name: str) -> None: |
| 94 | if self._con is None: |
| 95 | return |
| 96 | try: |
| 97 | cur = self._con.execute( |
| 98 | """ |
| 99 | SELECT packet_db, lan_hosts, dest_hosts, tor_traffic, mal_traffic |
| 100 | FROM sessions WHERE pcap_name = ? |
| 101 | """, |
| 102 | (pcap_name,), |
| 103 | ) |
| 104 | row = cur.fetchone() |
| 105 | except Exception as exc: |
| 106 | log.error("SqliteStore.load_session: DB read failed for '%s': %s", pcap_name, exc) |
| 107 | return |
| 108 | |
| 109 | if row is None: |
| 110 | log.warning("SqliteStore.load_session: '%s' not found", pcap_name) |
| 111 | return |
| 112 | |
| 113 | # Parse all blobs before touching memory — avoids partial-load on bad data. |
| 114 | try: |
| 115 | new_packet_db = { |
| 116 | k: PacketSession.model_validate(v) |
| 117 | for k, v in json.loads(row[0]).items() |
| 118 | } |
| 119 | new_lan_hosts = { |
| 120 | k: LanHost.model_validate(v) |
| 121 | for k, v in json.loads(row[1]).items() |
| 122 | } |
| 123 | new_dest_hosts = { |
| 124 | k: DestinationHost.model_validate(v) |
| 125 | for k, v in json.loads(row[2]).items() |
| 126 | } |
| 127 | new_tor = json.loads(row[3]) |
| 128 | new_mal = json.loads(row[4]) |
| 129 | except Exception as exc: |
| 130 | log.error("SqliteStore.load_session: corrupt data for '%s': %s", pcap_name, exc) |
| 131 | return |
| 132 | |
| 133 | memory.packet_db = new_packet_db |
| 134 | memory.lan_hosts = new_lan_hosts |
| 135 | memory.destination_hosts = new_dest_hosts |
| 136 | memory.possible_tor_traffic = new_tor |
| 137 | memory.possible_mal_traffic = new_mal |
| 138 | log.info( |
| 139 | "SqliteStore: loaded session '%s' (%d sessions, %d LAN hosts, %d dest hosts)", |
| 140 | pcap_name, len(memory.packet_db), len(memory.lan_hosts), len(memory.destination_hosts), |
| 141 | ) |
| 142 | |
| 143 | def list_sessions(self) -> list[str]: |
| 144 | if self._con is None: |
no outgoing calls