MCPcopy Index your code
hub / github.com/Python3WebSpider/ProxyPool / run_tests

Method run_tests

proxypool/processors/tester.py:113–131  ·  view source on GitHub ↗

test all proxies in batches, reusing a single aiohttp session :return:

(self)

Source from the content-addressed store, hash-verified

111 f'proxy {proxy.string()} is invalid, decrease score')
112
113 async def run_tests(self):
114 """
115 test all proxies in batches, reusing a single aiohttp session
116 :return:
117 """
118 count = self.redis.count()
119 logger.debug(f'{count} proxies to test')
120 cursor = 0
121 connector = aiohttp.TCPConnector(ssl=False, limit=TEST_BATCH)
122 async with aiohttp.ClientSession(connector=connector) as session:
123 while True:
124 logger.debug(
125 f'testing proxies use cursor {cursor}, count {TEST_BATCH}')
126 cursor, proxies = self.redis.batch(cursor, count=TEST_BATCH)
127 if proxies:
128 tasks = [self.test(proxy, session) for proxy in proxies]
129 await asyncio.gather(*tasks, return_exceptions=True)
130 if not cursor:
131 break
132
133 @logger.catch
134 def run(self):

Callers 1

runMethod · 0.95

Calls 3

testMethod · 0.95
countMethod · 0.80
batchMethod · 0.80

Tested by

no test coverage detected