MCPcopy Index your code
hub / github.com/Boris-code/feapder / check_proxy

Function check_proxy

feapder/network/proxy_pool_old.py:163–214  ·  view source on GitHub ↗

代理有效性检查 :param ip: :param port: :param type: 0:socket 1:requests :param timeout: :param logger: :return:

(
    ip="",
    port="",
    proxies=None,
    type=0,
    timeout=5,
    logger=None,
    show_error_log=True,
    **kwargs,
)

Source from the content-addressed store, hash-verified

161
162
163def check_proxy(
164 ip="",
165 port="",
166 proxies=None,
167 type=0,
168 timeout=5,
169 logger=None,
170 show_error_log=True,
171 **kwargs,
172):
173 """
174 代理有效性检查
175 :param ip:
176 :param port:
177 :param type: 0:socket 1:requests
178 :param timeout:
179 :param logger:
180 :return:
181 """
182 if not logger:
183 logger = log
184 ok = 0
185 if type == 0 and ip and port:
186 # socket检测成功 不代表代理一定可用 Connection closed by foreign host. 这种情况就不行
187 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sk:
188 sk.settimeout(timeout)
189 try:
190 # 必须检测 否则代理永远不刷新
191 sk.connect((ip, int(port)))
192 ok = 1
193 except Exception as e:
194 if show_error_log:
195 logger.debug("check proxy failed: {} {}:{}".format(e, ip, port))
196 sk.close()
197 else:
198 if not proxies:
199 proxies = {
200 "http": "http://{}:{}".format(ip, port),
201 "https": "http://{}:{}".format(ip, port),
202 }
203 try:
204 r = requests.get(
205 "http://www.baidu.com", proxies=proxies, timeout=timeout, stream=True
206 )
207 ok = 1
208 r.close()
209 except Exception as e:
210 if show_error_log:
211 logger.debug(
212 "check proxy failed: {} {}:{} {}".format(e, ip, port, proxies)
213 )
214 return ok
215
216
217class ProxyItem(object):

Callers 1

is_validMethod · 0.85

Calls 3

debugMethod · 0.80
closeMethod · 0.45
getMethod · 0.45

Tested by

no test coverage detected