MCPcopy Index your code
hub / github.com/coleifer/huey / test_fs_multithreaded

Method test_fs_multithreaded

huey/tests/test_storage.py:485–533  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

483 self.assertEqual(os.listdir(self.result_path), [])
484
485 def test_fs_multithreaded(self):
486 l = threading.Lock()
487
488 def create_tasks(t, n, q):
489 for i in range(n):
490 with l:
491 message = str((t * n) + i)
492 self.huey.storage.enqueue(message.encode('utf8'))
493 q.put(message)
494
495 def dequeue_tasks(q):
496 while True:
497 with l:
498 data = self.huey.storage.dequeue()
499 if data is None:
500 break
501 q.put(data.decode('utf8'))
502
503 nthreads = 10
504 ntasks = 50
505 in_q = Queue()
506 threads = []
507 for i in range(nthreads):
508 t = threading.Thread(target=create_tasks, args=(i, ntasks, in_q))
509 t.daemon = True
510 threads.append(t)
511
512 for t in threads: t.start()
513 for t in threads: t.join(timeout=10.)
514
515 self.assertEqual(self.huey.pending_count(), nthreads * ntasks)
516
517 out_q = Queue()
518 threads = []
519 for i in range(nthreads):
520 t = threading.Thread(target=dequeue_tasks, args=(out_q,))
521 t.daemon = True
522 threads.append(t)
523
524 for t in threads: t.start()
525 for t in threads: t.join(timeout=10.)
526
527 self.assertEqual(out_q.qsize(), nthreads * ntasks)
528 self.assertEqual(self.huey.pending_count(), 0)
529
530 # Ensure that the order in which tasks were enqueued is the order in
531 # which they are dequeued.
532 for i in range(nthreads * ntasks):
533 self.assertEqual(in_q.get(), out_q.get())
534
535 @unittest.skipIf(TRAVIS, 'skipping test that is flaky on travis-ci')
536 def test_consumer_integration(self):

Callers

nothing calls this directly

Calls 3

pending_countMethod · 0.80
startMethod · 0.45
getMethod · 0.45

Tested by

no test coverage detected