(self, tests, batch_size, max_heavy)
| 62 | class TestSequenceProc(unittest.TestCase): |
| 63 | |
| 64 | def _test(self, tests, batch_size, max_heavy): |
| 65 | # Set up a simple processing pipeline: |
| 66 | # Loader -> observe results -> sequencer -> execution. |
| 67 | loader = LoadProc(iter(tests), initial_batch_size=batch_size) |
| 68 | results = FakeResultObserver() |
| 69 | sequence_proc = SequenceProc(max_heavy) |
| 70 | execution = FakeExecutionProc() |
| 71 | loader.connect_to(results) |
| 72 | results.connect_to(sequence_proc) |
| 73 | sequence_proc.connect_to(execution) |
| 74 | |
| 75 | # Fill the execution queue (with the number of tests potentially |
| 76 | # executed in parallel). |
| 77 | loader.load_initial_tests() |
| 78 | |
| 79 | # Simulate the execution test by test. |
| 80 | while execution.tests: |
| 81 | # Assert the invariant of maximum heavy tests executed simultaneously. |
| 82 | self.assertLessEqual( |
| 83 | sum(int(test.is_heavy) for test in execution.tests), max_heavy) |
| 84 | |
| 85 | # As in the real pipeline, running a test and returning its result |
| 86 | # will add another test into the pipeline. |
| 87 | execution.run() |
| 88 | |
| 89 | # Ensure that all tests are processed and deliver results. |
| 90 | self.assertEqual(set(test.n for test in tests), results.tests) |
| 91 | |
| 92 | def test_wrong_usage(self): |
| 93 | with self.assertRaises(Exception): |
no test coverage detected