(self)
| 124 | return pl |
| 125 | |
| 126 | def run(self): |
| 127 | # type: () -> None |
| 128 | log_runtime.debug("Pipe engine thread started.") |
| 129 | try: |
| 130 | for p in self.active_pipes: |
| 131 | p.start() |
| 132 | sources = self.active_sources |
| 133 | sources.add(self) |
| 134 | exhausted = set([]) # type: Set[Union[Source, PipeEngine]] |
| 135 | RUN = True |
| 136 | STOP_IF_EXHAUSTED = False |
| 137 | while RUN and (not STOP_IF_EXHAUSTED or len(sources) > 1): |
| 138 | fds = select_objects(sources, 0.5) |
| 139 | for fd in fds: |
| 140 | if fd is self: |
| 141 | cmd = self._read_cmd() |
| 142 | if cmd == "X": |
| 143 | RUN = False |
| 144 | break |
| 145 | elif cmd == "B": |
| 146 | STOP_IF_EXHAUSTED = True |
| 147 | elif cmd == "A": |
| 148 | sources = self.active_sources - exhausted |
| 149 | sources.add(self) |
| 150 | else: |
| 151 | warning("Unknown internal pipe engine command: %r." |
| 152 | " Ignoring.", cmd) |
| 153 | elif fd in sources: |
| 154 | try: |
| 155 | fd.deliver() |
| 156 | except Exception as e: |
| 157 | log_runtime.exception("piping from %s failed: %s", |
| 158 | fd.name, e) |
| 159 | else: |
| 160 | if fd.exhausted(): |
| 161 | exhausted.add(fd) |
| 162 | sources.remove(fd) |
| 163 | except KeyboardInterrupt: |
| 164 | pass |
| 165 | finally: |
| 166 | try: |
| 167 | for p in self.active_pipes: |
| 168 | p.stop() |
| 169 | finally: |
| 170 | self.thread_lock.release() |
| 171 | log_runtime.debug("Pipe engine thread stopped.") |
| 172 | |
| 173 | def start(self): |
| 174 | # type: () -> None |
nothing calls this directly
no test coverage detected