MCPcopy
hub / github.com/dask/dask / process_runnables

Function process_runnables

dask/order.py:299–403  ·  view source on GitHub ↗

Compute all currently runnable paths and either cache or execute them This is designed to ensure we are running tasks that are free to execute (e.g. the result of a splitter task) not too eagerly. If we executed such free tasks too early we'd be walking the graph in a too wi

()

Source from the content-addressed store, hash-verified

297
298 @_with_offset
299 def process_runnables() -> None:
300 """Compute all currently runnable paths and either cache or execute them
301
302 This is designed to ensure we are running tasks that are free to execute
303 (e.g. the result of a splitter task) not too eagerly. If we executed
304 such free tasks too early we'd be walking the graph in a too wide /
305 breadth first fashion that is not optimal. If instead we were to only
306 execute them once they are needed for a final result, this can cause
307 very high memory pressure since valuable reducers are executed too
308 late.
309
310 The strategy here is to take all runnable tasks and walk forwards until
311 we hit a reducer node (i.e. a node with more than one dependency). We
312 will remember/cache the path to this reducer node.
313 If this path leads to a leaf or if we find enough runnable paths for a
314 reducer to be runnable, we will execute the path.
315
316 If instead of a reducer a splitter is encountered that is runnable, we
317 will follow its splitter paths individually and apply the same logic to
318 each branch.
319 """
320 while runnable:
321 candidates = runnable.copy()
322 runnable.clear()
323 while candidates:
324 key = candidates.pop()
325 if key in runnable_hull or key in result:
326 continue
327 if key in leaf_nodes:
328 add_to_result(key)
329 continue
330 path = [key]
331 branches = deque([(0, path)])
332
333 while branches:
334 nsplits, path = branches.popleft()
335 while True:
336 # Loop invariant. Too expensive to compute at runtime
337 # assert not set(known_runnable_paths).intersection(runnable_hull)
338 current = path[-1]
339 runnable_hull.add(current)
340 deps_downstream = dependents[current]
341 deps_upstream = dependencies[current]
342 if not deps_downstream:
343 # FIXME: The fact that it is possible for
344 # num_needed[current] == 0 means we're doing some
345 # work twice
346 if num_needed[current] <= 1:
347 for k in path:
348 add_to_result(k)
349 else:
350 runnable_hull.discard(current)
351 elif len(path) == 1 or len(deps_upstream) == 1:
352 if len(deps_downstream) > 1:
353 nsplits += 1
354 for d in sorted(deps_downstream, key=sort_key):
355 # This ensures we're only considering splitters
356 # that are genuinely splitting and not

Callers 1

orderFunction · 0.85

Calls 5

add_to_resultFunction · 0.85
popMethod · 0.80
copyMethod · 0.45
clearMethod · 0.45
addMethod · 0.45

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…