MCPcopy
hub / github.com/aosabook/500lines / test_max_tasks

Method test_max_tasks

crawler/code/test.py:249–268  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

247 self.assertDoneCount(4)
248
249 def test_max_tasks(self):
250 n_tasks = 0
251 max_tasks = 0
252
253 @asyncio.coroutine
254 def handler(_):
255 nonlocal n_tasks, max_tasks
256 n_tasks += 1
257 max_tasks = max(n_tasks, max_tasks)
258 yield from asyncio.sleep(0.01, loop=self.loop)
259 n_tasks -= 1
260 return web.Response(body=b'')
261
262 urls = ['/0', '/1', '/2']
263 for url in urls:
264 self.add_handler(url, handler)
265 home = self.add_page('/', urls)
266 self.crawl([home], max_tasks=2)
267 self.assertEqual(2, max_tasks)
268 self.assertDoneCount(4)
269
270 def test_max_tries(self):
271 n_tries = 0

Callers

nothing calls this directly

Calls 4

add_handlerMethod · 0.95
add_pageMethod · 0.95
crawlMethod · 0.95
assertDoneCountMethod · 0.95

Tested by

no test coverage detected