MCPcopy
hub / github.com/HelloZeroNet/ZeroNet / portCheck

Method portCheck

src/File/FileServer.py:152–221  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

150 FileRequest = imp.load_source("FileRequest", "src/File/FileRequest.py").FileRequest
151
152 def portCheck(self):
153 if config.offline:
154 self.log.info("Offline mode: port check disabled")
155 res = {"ipv4": None, "ipv6": None}
156 self.port_opened = res
157 return res
158
159 if config.ip_external:
160 for ip_external in config.ip_external:
161 SiteManager.peer_blacklist.append((ip_external, self.port)) # Add myself to peer blacklist
162
163 ip_external_types = set([helper.getIpType(ip) for ip in config.ip_external])
164 res = {
165 "ipv4": "ipv4" in ip_external_types,
166 "ipv6": "ipv6" in ip_external_types
167 }
168 self.ip_external_list = config.ip_external
169 self.port_opened.update(res)
170 self.log.info("Server port opened based on configuration ipv4: %s, ipv6: %s" % (res["ipv4"], res["ipv6"]))
171 return res
172
173 self.port_opened = {}
174 if self.ui_server:
175 self.ui_server.updateWebsocket()
176
177 if "ipv6" in self.supported_ip_types:
178 res_ipv6_thread = gevent.spawn(self.portchecker.portCheck, self.port, "ipv6")
179 else:
180 res_ipv6_thread = None
181
182 res_ipv4 = self.portchecker.portCheck(self.port, "ipv4")
183 if not res_ipv4["opened"] and config.tor != "always":
184 if self.portchecker.portOpen(self.port):
185 res_ipv4 = self.portchecker.portCheck(self.port, "ipv4")
186
187 if res_ipv6_thread is None:
188 res_ipv6 = {"ip": None, "opened": None}
189 else:
190 res_ipv6 = res_ipv6_thread.get()
191 if res_ipv6["opened"] and not helper.getIpType(res_ipv6["ip"]) == "ipv6":
192 self.log.info("Invalid IPv6 address from port check: %s" % res_ipv6["ip"])
193 res_ipv6["opened"] = False
194
195 self.ip_external_list = []
196 for res_ip in [res_ipv4, res_ipv6]:
197 if res_ip["ip"] and res_ip["ip"] not in self.ip_external_list:
198 self.ip_external_list.append(res_ip["ip"])
199 SiteManager.peer_blacklist.append((res_ip["ip"], self.port))
200
201 self.log.info("Server port opened ipv4: %s, ipv6: %s" % (res_ipv4["opened"], res_ipv6["opened"]))
202
203 res = {"ipv4": res_ipv4["opened"], "ipv6": res_ipv6["opened"]}
204
205 # Add external IPs from local interfaces
206 interface_ips = helper.getInterfaceIps("ipv4")
207 if "ipv6" in self.supported_ip_types:
208 interface_ips += helper.getInterfaceIps("ipv6")
209 for ip in interface_ips:

Callers 3

sitePublishMethod · 0.95
checkSitesMethod · 0.95
actionServerPortcheckMethod · 0.45

Calls 4

portOpenMethod · 0.80
updateMethod · 0.45
updateWebsocketMethod · 0.45
getMethod · 0.45

Tested by

no test coverage detected