Compute all currently runnable paths and either cache or execute them This is designed to ensure we are running tasks that are free to execute (e.g. the result of a splitter task) not too eagerly. If we executed such free tasks too early we'd be walking the graph in a too wi
()
| 297 | |
| 298 | @_with_offset |
| 299 | def process_runnables() -> None: |
| 300 | """Compute all currently runnable paths and either cache or execute them |
| 301 | |
| 302 | This is designed to ensure we are running tasks that are free to execute |
| 303 | (e.g. the result of a splitter task) not too eagerly. If we executed |
| 304 | such free tasks too early we'd be walking the graph in a too wide / |
| 305 | breadth first fashion that is not optimal. If instead we were to only |
| 306 | execute them once they are needed for a final result, this can cause |
| 307 | very high memory pressure since valuable reducers are executed too |
| 308 | late. |
| 309 | |
| 310 | The strategy here is to take all runnable tasks and walk forwards until |
| 311 | we hit a reducer node (i.e. a node with more than one dependency). We |
| 312 | will remember/cache the path to this reducer node. |
| 313 | If this path leads to a leaf or if we find enough runnable paths for a |
| 314 | reducer to be runnable, we will execute the path. |
| 315 | |
| 316 | If instead of a reducer a splitter is encountered that is runnable, we |
| 317 | will follow its splitter paths individually and apply the same logic to |
| 318 | each branch. |
| 319 | """ |
| 320 | while runnable: |
| 321 | candidates = runnable.copy() |
| 322 | runnable.clear() |
| 323 | while candidates: |
| 324 | key = candidates.pop() |
| 325 | if key in runnable_hull or key in result: |
| 326 | continue |
| 327 | if key in leaf_nodes: |
| 328 | add_to_result(key) |
| 329 | continue |
| 330 | path = [key] |
| 331 | branches = deque([(0, path)]) |
| 332 | |
| 333 | while branches: |
| 334 | nsplits, path = branches.popleft() |
| 335 | while True: |
| 336 | # Loop invariant. Too expensive to compute at runtime |
| 337 | # assert not set(known_runnable_paths).intersection(runnable_hull) |
| 338 | current = path[-1] |
| 339 | runnable_hull.add(current) |
| 340 | deps_downstream = dependents[current] |
| 341 | deps_upstream = dependencies[current] |
| 342 | if not deps_downstream: |
| 343 | # FIXME: The fact that it is possible for |
| 344 | # num_needed[current] == 0 means we're doing some |
| 345 | # work twice |
| 346 | if num_needed[current] <= 1: |
| 347 | for k in path: |
| 348 | add_to_result(k) |
| 349 | else: |
| 350 | runnable_hull.discard(current) |
| 351 | elif len(path) == 1 or len(deps_upstream) == 1: |
| 352 | if len(deps_downstream) > 1: |
| 353 | nsplits += 1 |
| 354 | for d in sorted(deps_downstream, key=sort_key): |
| 355 | # This ensures we're only considering splitters |
| 356 | # that are genuinely splitting and not |