(self, option="All", to_ip="All", from_ip="All")
| 98 | return graph |
| 99 | |
| 100 | def draw_graph(self, option="All", to_ip="All", from_ip="All"): |
| 101 | #f = Digraph('network_diagram - '+option, filename=self.filename, engine="dot", format="png") |
| 102 | #f.attr(rankdir='LR', size='8,5') |
| 103 | if len(memory.lan_hosts) > 40: |
| 104 | f = Digraph('network_diagram - '+option, filename=self.filename, engine="sfdp", format="png") |
| 105 | elif len(memory.lan_hosts) > 20: |
| 106 | f = Digraph('network_diagram - '+option, filename=self.filename, engine="circo", format="png") |
| 107 | else: |
| 108 | f = Digraph('network_diagram - '+option, filename=self.filename, engine="dot", format="png") |
| 109 | |
| 110 | interactive_graph = Network(directed=True, height="750px", width="100%", bgcolor="#222222", font_color="white") |
| 111 | interactive_graph.barnes_hut() |
| 112 | vis_nodes = [] |
| 113 | vis_edges = [] |
| 114 | |
| 115 | f.attr('node', shape='doublecircle') |
| 116 | #f.node('defaultGateway') |
| 117 | |
| 118 | f.attr('node', shape='circle') |
| 119 | |
| 120 | log.info("draw_graph: option=%s sessions=%d", option, len(self.sessions)) |
| 121 | edge_present = False |
| 122 | |
| 123 | mal, tor, http, https, icmp, dns, clear_text, unknown, covert = 0, 0, 0, 0, 0, 0, 0, 0, 0 |
| 124 | |
| 125 | if option == "All": |
| 126 | # add nodes |
| 127 | for session in self.sessions: |
| 128 | src, dst, port = session.split("/") |
| 129 | |
| 130 | if (src == from_ip and dst == to_ip) or \ |
| 131 | (from_ip == "All" and to_ip == "All") or \ |
| 132 | (to_ip == "All" and from_ip == src) or \ |
| 133 | (to_ip == dst and from_ip == "All"): |
| 134 | # TODO: Improvise this logic below |
| 135 | # * graphviz graph is not very good with the ":" in strings |
| 136 | if ":" in src: |
| 137 | map_src = src.replace(":",".") |
| 138 | else: |
| 139 | map_src = src |
| 140 | if ":" in dst: |
| 141 | map_dst = dst.replace(":", ".") |
| 142 | else: |
| 143 | map_dst = dst |
| 144 | |
| 145 | # Lan Host |
| 146 | if memory.packet_db[session].Ethernet["src"] not in memory.lan_hosts: |
| 147 | curr_node = map_src+"\n"+memory.packet_db[session].Ethernet["src"].replace(":",".") |
| 148 | f.node(curr_node) |
| 149 | else: |
| 150 | curr_node = _node_label(memory.packet_db[session].Ethernet["src"]) |
| 151 | f.node(curr_node) |
| 152 | |
| 153 | # Destination |
| 154 | if dst in memory.destination_hosts: |
| 155 | if memory.destination_hosts[dst].mac in memory.lan_hosts: |
| 156 | destination = _node_label(memory.destination_hosts[dst].mac) |
| 157 | dlabel = memory.destination_hosts[dst].domain_name |
no test coverage detected