Last processor in the chain. Instead of passing tests further it creates commands and output processors, executes them in multiple worker processes and sends results to the previous processor.
| 23 | |
| 24 | |
| 25 | class ExecutionProc(base.TestProc): |
| 26 | """Last processor in the chain. Instead of passing tests further it creates |
| 27 | commands and output processors, executes them in multiple worker processes and |
| 28 | sends results to the previous processor. |
| 29 | """ |
| 30 | |
| 31 | def __init__(self, ctx, jobs, outproc_factory=None): |
| 32 | super(ExecutionProc, self).__init__() |
| 33 | self.ctx = ctx |
| 34 | self.ctx.pool.init(jobs, notify_function=self.notify_previous) |
| 35 | self._outproc_factory = outproc_factory or (lambda t: t.output_proc) |
| 36 | self._tests = {} |
| 37 | |
| 38 | def connect_to(self, next_proc): |
| 39 | assert False, \ |
| 40 | 'ExecutionProc cannot be connected to anything' # pragma: no cover |
| 41 | |
| 42 | def run(self, requirement=None): |
| 43 | for pool_result in self.ctx.pool.results(requirement): |
| 44 | self._unpack_result(pool_result) |
| 45 | |
| 46 | def next_test(self, test): |
| 47 | if self.is_stopped: |
| 48 | return False |
| 49 | |
| 50 | test_id = test.procid |
| 51 | cmd = test.get_command(self.ctx) |
| 52 | self._tests[test_id] = test, cmd |
| 53 | |
| 54 | outproc = self._outproc_factory(test) |
| 55 | self.ctx.pool.add_jobs([Job(test_id, cmd, outproc, test.keep_output)]) |
| 56 | |
| 57 | return True |
| 58 | |
| 59 | def result_for(self, test, result): |
| 60 | assert False, \ |
| 61 | 'ExecutionProc cannot receive results' # pragma: no cover |
| 62 | |
| 63 | def stop(self): |
| 64 | super(ExecutionProc, self).stop() |
| 65 | self.ctx.pool.abort() |
| 66 | |
| 67 | def _unpack_result(self, pool_result): |
| 68 | if pool_result.heartbeat: |
| 69 | self.heartbeat() |
| 70 | return |
| 71 | |
| 72 | job_result = pool_result.value |
| 73 | test_id, result = job_result |
| 74 | |
| 75 | test, result.cmd = self._tests[test_id] |
| 76 | del self._tests[test_id] |
| 77 | self._send_result(test, result) |
no outgoing calls
no test coverage detected
searching dependent graphs…