安全的 HTTP 适配器,防止 DNS 重绑定攻击 在建立连接前验证目标 IP 地址
| 170 | |
| 171 | |
| 172 | class SafeHTTPAdapter(HTTPAdapter): |
| 173 | """ |
| 174 | 安全的 HTTP 适配器,防止 DNS 重绑定攻击 |
| 175 | 在建立连接前验证目标 IP 地址 |
| 176 | """ |
| 177 | |
| 178 | def send(self, request, **kwargs): |
| 179 | # 解析 URL 获取主机名 |
| 180 | parsed_url = urlparse(request.url) |
| 181 | host = parsed_url.hostname |
| 182 | |
| 183 | if host: |
| 184 | # 验证目标 IP 是否安全 |
| 185 | self._validate_host_ip(host) |
| 186 | |
| 187 | return super().send(request, **kwargs) |
| 188 | |
| 189 | def _validate_host_ip(self, host: str): |
| 190 | """验证主机解析的 IP 地址是否安全""" |
| 191 | try: |
| 192 | # 获取所有 IP 地址(包括 IPv4 和 IPv6) |
| 193 | addr_infos = socket.getaddrinfo(host, None, socket.AF_UNSPEC, socket.SOCK_STREAM) |
| 194 | |
| 195 | for addr_info in addr_infos: |
| 196 | ip = addr_info[4][0] |
| 197 | if self._is_unsafe_ip(ip): |
| 198 | raise AppApiException(500, _('Access to internal IP addresses is blocked')) |
| 199 | except AppApiException: |
| 200 | raise |
| 201 | except Exception as e: |
| 202 | raise AppApiException(500, _('Failed to resolve host: {error}').format(error=str(e))) |
| 203 | |
| 204 | def _is_unsafe_ip(self, ip: str) -> bool: |
| 205 | """检查 IP 地址是否属于不安全的范围""" |
| 206 | try: |
| 207 | ip_addr = ipaddress.ip_address(ip) |
| 208 | return ( |
| 209 | ip_addr.is_private or |
| 210 | ip_addr.is_loopback or |
| 211 | ip_addr.is_reserved or |
| 212 | ip_addr.is_link_local or |
| 213 | ip_addr.is_multicast |
| 214 | ) |
| 215 | except Exception: |
| 216 | return True |
| 217 | |
| 218 | |
| 219 | def get_url_content(url, application_id: str): |