MCPcopy
hub / github.com/srixivas/PcapXray / load_session

Method load_session

Source/Module/sqlite_store.py:93–141  ·  view source on GitHub ↗
(self, pcap_name: str)

Source from the content-addressed store, hash-verified

91 log.error("SqliteStore.save_session: %s", exc)
92
93 def load_session(self, pcap_name: str) -> None:
94 if self._con is None:
95 return
96 try:
97 cur = self._con.execute(
98 """
99 SELECT packet_db, lan_hosts, dest_hosts, tor_traffic, mal_traffic
100 FROM sessions WHERE pcap_name = ?
101 """,
102 (pcap_name,),
103 )
104 row = cur.fetchone()
105 except Exception as exc:
106 log.error("SqliteStore.load_session: DB read failed for '%s': %s", pcap_name, exc)
107 return
108
109 if row is None:
110 log.warning("SqliteStore.load_session: '%s' not found", pcap_name)
111 return
112
113 # Parse all blobs before touching memory — avoids partial-load on bad data.
114 try:
115 new_packet_db = {
116 k: PacketSession.model_validate(v)
117 for k, v in json.loads(row[0]).items()
118 }
119 new_lan_hosts = {
120 k: LanHost.model_validate(v)
121 for k, v in json.loads(row[1]).items()
122 }
123 new_dest_hosts = {
124 k: DestinationHost.model_validate(v)
125 for k, v in json.loads(row[2]).items()
126 }
127 new_tor = json.loads(row[3])
128 new_mal = json.loads(row[4])
129 except Exception as exc:
130 log.error("SqliteStore.load_session: corrupt data for '%s': %s", pcap_name, exc)
131 return
132
133 memory.packet_db = new_packet_db
134 memory.lan_hosts = new_lan_hosts
135 memory.destination_hosts = new_dest_hosts
136 memory.possible_tor_traffic = new_tor
137 memory.possible_mal_traffic = new_mal
138 log.info(
139 "SqliteStore: loaded session '%s' (%d sessions, %d LAN hosts, %d dest hosts)",
140 pcap_name, len(memory.packet_db), len(memory.lan_hosts), len(memory.destination_hosts),
141 )
142
143 def list_sessions(self) -> list[str]:
144 if self._con is None:

Calls

no outgoing calls