First processor in the chain that passes all tests to the next processor.
| 5 | from . import base |
| 6 | |
| 7 | class LoadProc(base.TestProc): |
| 8 | """First processor in the chain that passes all tests to the next processor. |
| 9 | """ |
| 10 | |
| 11 | def __init__(self, tests, initial_batch_size=float('inf')): |
| 12 | super(LoadProc, self).__init__() |
| 13 | |
| 14 | self.tests = tests |
| 15 | self.initial_batch_size = initial_batch_size |
| 16 | |
| 17 | |
| 18 | def load_initial_tests(self): |
| 19 | """Send an initial batch of tests down to executor""" |
| 20 | if not self.initial_batch_size: |
| 21 | return |
| 22 | to_load = self.initial_batch_size |
| 23 | for t in self.tests: |
| 24 | if self._send_test(t): |
| 25 | to_load -= 1 |
| 26 | if not to_load: |
| 27 | break |
| 28 | |
| 29 | def next_test(self, test): |
| 30 | assert False, \ |
| 31 | 'Nothing can be connected to the LoadProc' # pragma: no cover |
| 32 | |
| 33 | def result_for(self, test, result): |
| 34 | for t in self.tests: |
| 35 | if self._send_test(t): |
| 36 | break |
no outgoing calls
searching dependent graphs…