MCPcopy Index your code
hub / github.com/nodejs/node / imap_unordered

Method imap_unordered

deps/v8/tools/testrunner/local/pool.py:226–293  ·  view source on GitHub ↗

Maps function "fn" to items in generator "gen" on the worker processes in an arbitrary order. The items are expected to be lists of arguments to the function. Returns a results iterator. A result value of type MaybeResult either indicates a heartbeat of the runner, i.e. indicating th

(self, fn, gen,
                     process_context_fn=None, process_context_args=None)

Source from the content-addressed store, hash-verified

224 )
225
226 def imap_unordered(self, fn, gen,
227 process_context_fn=None, process_context_args=None):
228 """Maps function "fn" to items in generator "gen" on the worker processes
229 in an arbitrary order. The items are expected to be lists of arguments to
230 the function. Returns a results iterator. A result value of type
231 MaybeResult either indicates a heartbeat of the runner, i.e. indicating
232 that the runner is still waiting for the result to be computed, or it wraps
233 the real result.
234
235 Args:
236 process_context_fn: Function executed once by each worker. Expected to
237 return a process-context object. If present, this object is passed
238 as additional argument to each call to fn.
239 process_context_args: List of arguments for the invocation of
240 process_context_fn. All arguments will be pickled and sent beyond the
241 process boundary.
242 """
243 if self.terminated:
244 return
245 try:
246 internal_error = False
247 gen = iter(gen)
248 self.advance = self._advance_more
249
250 # Disable sigint and sigterm to prevent subprocesses from capturing the
251 # signals.
252 with without_sig():
253 for w in range(self.num_workers):
254 p = Process(target=Worker, args=(fn,
255 self.work_queue,
256 self.done_queue,
257 process_context_fn,
258 process_context_args))
259 p.start()
260 self.processes.append(p)
261
262 self.advance(gen)
263 while self.processing_count > 0:
264 while True:
265 try:
266 # Read from result queue in a responsive fashion. If available,
267 # this will return a normal result immediately or a heartbeat on
268 # heartbeat timeout (default 1 second).
269 result = self._get_result_from_queue()
270 except:
271 # TODO(machenbach): Handle a few known types of internal errors
272 # gracefully, e.g. missing test files.
273 logging.exception('Internal error in a worker process.')
274 internal_error = True
275 continue
276 finally:
277 if self.abort_now:
278 # SIGINT, SIGTERM or internal hard timeout.
279 return
280
281 yield result
282 break
283

Callers 9

testNormalMethod · 0.95
testExceptionMethod · 0.95
testAddMethod · 0.95
resultsMethod · 0.95
MainFunction · 0.80
workbench.pyFile · 0.80
write_instrumentedFunction · 0.80
mergeFunction · 0.80

Calls 9

_terminateMethod · 0.95
without_sigFunction · 0.85
advanceMethod · 0.80
exceptionMethod · 0.80
rangeFunction · 0.50
ProcessFunction · 0.50
startMethod · 0.45
appendMethod · 0.45

Tested by 4

testNormalMethod · 0.76
testExceptionMethod · 0.76
testAddMethod · 0.76
MainFunction · 0.64