(self)
| 483 | self.assertEqual(os.listdir(self.result_path), []) |
| 484 | |
| 485 | def test_fs_multithreaded(self): |
| 486 | l = threading.Lock() |
| 487 | |
| 488 | def create_tasks(t, n, q): |
| 489 | for i in range(n): |
| 490 | with l: |
| 491 | message = str((t * n) + i) |
| 492 | self.huey.storage.enqueue(message.encode('utf8')) |
| 493 | q.put(message) |
| 494 | |
| 495 | def dequeue_tasks(q): |
| 496 | while True: |
| 497 | with l: |
| 498 | data = self.huey.storage.dequeue() |
| 499 | if data is None: |
| 500 | break |
| 501 | q.put(data.decode('utf8')) |
| 502 | |
| 503 | nthreads = 10 |
| 504 | ntasks = 50 |
| 505 | in_q = Queue() |
| 506 | threads = [] |
| 507 | for i in range(nthreads): |
| 508 | t = threading.Thread(target=create_tasks, args=(i, ntasks, in_q)) |
| 509 | t.daemon = True |
| 510 | threads.append(t) |
| 511 | |
| 512 | for t in threads: t.start() |
| 513 | for t in threads: t.join(timeout=10.) |
| 514 | |
| 515 | self.assertEqual(self.huey.pending_count(), nthreads * ntasks) |
| 516 | |
| 517 | out_q = Queue() |
| 518 | threads = [] |
| 519 | for i in range(nthreads): |
| 520 | t = threading.Thread(target=dequeue_tasks, args=(out_q,)) |
| 521 | t.daemon = True |
| 522 | threads.append(t) |
| 523 | |
| 524 | for t in threads: t.start() |
| 525 | for t in threads: t.join(timeout=10.) |
| 526 | |
| 527 | self.assertEqual(out_q.qsize(), nthreads * ntasks) |
| 528 | self.assertEqual(self.huey.pending_count(), 0) |
| 529 | |
| 530 | # Ensure that the order in which tasks were enqueued is the order in |
| 531 | # which they are dequeued. |
| 532 | for i in range(nthreads * ntasks): |
| 533 | self.assertEqual(in_q.get(), out_q.get()) |
| 534 | |
| 535 | @unittest.skipIf(TRAVIS, 'skipping test that is flaky on travis-ci') |
| 536 | def test_consumer_integration(self): |
nothing calls this directly
no test coverage detected