MCPcopy
hub / github.com/1Panel-dev/MaxKB / SafeHTTPAdapter

Class SafeHTTPAdapter

apps/oss/serializers/file.py:172–216  ·  view source on GitHub ↗

安全的 HTTP 适配器,防止 DNS 重绑定攻击 在建立连接前验证目标 IP 地址

Source from the content-addressed store, hash-verified

170
171
172class SafeHTTPAdapter(HTTPAdapter):
173 """
174 安全的 HTTP 适配器,防止 DNS 重绑定攻击
175 在建立连接前验证目标 IP 地址
176 """
177
178 def send(self, request, **kwargs):
179 # 解析 URL 获取主机名
180 parsed_url = urlparse(request.url)
181 host = parsed_url.hostname
182
183 if host:
184 # 验证目标 IP 是否安全
185 self._validate_host_ip(host)
186
187 return super().send(request, **kwargs)
188
189 def _validate_host_ip(self, host: str):
190 """验证主机解析的 IP 地址是否安全"""
191 try:
192 # 获取所有 IP 地址(包括 IPv4 和 IPv6)
193 addr_infos = socket.getaddrinfo(host, None, socket.AF_UNSPEC, socket.SOCK_STREAM)
194
195 for addr_info in addr_infos:
196 ip = addr_info[4][0]
197 if self._is_unsafe_ip(ip):
198 raise AppApiException(500, _('Access to internal IP addresses is blocked'))
199 except AppApiException:
200 raise
201 except Exception as e:
202 raise AppApiException(500, _('Failed to resolve host: {error}').format(error=str(e)))
203
204 def _is_unsafe_ip(self, ip: str) -> bool:
205 """检查 IP 地址是否属于不安全的范围"""
206 try:
207 ip_addr = ipaddress.ip_address(ip)
208 return (
209 ip_addr.is_private or
210 ip_addr.is_loopback or
211 ip_addr.is_reserved or
212 ip_addr.is_link_local or
213 ip_addr.is_multicast
214 )
215 except Exception:
216 return True
217
218
219def get_url_content(url, application_id: str):

Callers 1

get_url_contentFunction · 0.85

Calls

no outgoing calls

Tested by

no test coverage detected