Maps function "fn" to items in generator "gen" on the worker processes in an arbitrary order. The items are expected to be lists of arguments to the function. Returns a results iterator. A result value of type MaybeResult either indicates a heartbeat of the runner, i.e. indicating th
(self, fn, gen,
process_context_fn=None, process_context_args=None)
| 224 | ) |
| 225 | |
| 226 | def imap_unordered(self, fn, gen, |
| 227 | process_context_fn=None, process_context_args=None): |
| 228 | """Maps function "fn" to items in generator "gen" on the worker processes |
| 229 | in an arbitrary order. The items are expected to be lists of arguments to |
| 230 | the function. Returns a results iterator. A result value of type |
| 231 | MaybeResult either indicates a heartbeat of the runner, i.e. indicating |
| 232 | that the runner is still waiting for the result to be computed, or it wraps |
| 233 | the real result. |
| 234 | |
| 235 | Args: |
| 236 | process_context_fn: Function executed once by each worker. Expected to |
| 237 | return a process-context object. If present, this object is passed |
| 238 | as additional argument to each call to fn. |
| 239 | process_context_args: List of arguments for the invocation of |
| 240 | process_context_fn. All arguments will be pickled and sent beyond the |
| 241 | process boundary. |
| 242 | """ |
| 243 | if self.terminated: |
| 244 | return |
| 245 | try: |
| 246 | internal_error = False |
| 247 | gen = iter(gen) |
| 248 | self.advance = self._advance_more |
| 249 | |
| 250 | # Disable sigint and sigterm to prevent subprocesses from capturing the |
| 251 | # signals. |
| 252 | with without_sig(): |
| 253 | for w in range(self.num_workers): |
| 254 | p = Process(target=Worker, args=(fn, |
| 255 | self.work_queue, |
| 256 | self.done_queue, |
| 257 | process_context_fn, |
| 258 | process_context_args)) |
| 259 | p.start() |
| 260 | self.processes.append(p) |
| 261 | |
| 262 | self.advance(gen) |
| 263 | while self.processing_count > 0: |
| 264 | while True: |
| 265 | try: |
| 266 | # Read from result queue in a responsive fashion. If available, |
| 267 | # this will return a normal result immediately or a heartbeat on |
| 268 | # heartbeat timeout (default 1 second). |
| 269 | result = self._get_result_from_queue() |
| 270 | except: |
| 271 | # TODO(machenbach): Handle a few known types of internal errors |
| 272 | # gracefully, e.g. missing test files. |
| 273 | logging.exception('Internal error in a worker process.') |
| 274 | internal_error = True |
| 275 | continue |
| 276 | finally: |
| 277 | if self.abort_now: |
| 278 | # SIGINT, SIGTERM or internal hard timeout. |
| 279 | return |
| 280 | |
| 281 | yield result |
| 282 | break |
| 283 |