MCPcopy
hub / github.com/dask/dask / fire_tasks

Function fire_tasks

dask/local.py:488–531  ·  view source on GitHub ↗

Fire off a task to the thread pool

(chunksize)

Source from the content-addressed store, hash-verified

486 raise ValueError("Found no accessible jobs in dask")
487
488 def fire_tasks(chunksize):
489 """Fire off a task to the thread pool"""
490 # Determine chunksize and/or number of tasks to submit
491 nready = len(state["ready"])
492 if chunksize == -1:
493 ntasks = nready
494 chunksize = -(ntasks // -num_workers)
495 else:
496 used_workers = -(len(state["running"]) // -chunksize)
497 avail_workers = max(num_workers - used_workers, 0)
498 ntasks = min(nready, chunksize * avail_workers)
499
500 # Prep all ready tasks for submission
501 args = []
502 for _ in range(ntasks):
503 # Get the next task to compute (most recently added)
504 key = state["ready"].pop()
505 # Notify task is running
506 state["running"].add(key)
507 for f in pretask_cbs:
508 f(key, dsk, state)
509
510 # Prep args to send
511 data = {
512 dep: state["cache"][dep] for dep in state["dependencies"][key]
513 }
514 args.append(
515 (
516 key,
517 dumps((dsk[key], data)),
518 dumps,
519 loads,
520 get_id,
521 pack_exception,
522 )
523 )
524
525 # Batch submit
526 for i in range(-(len(args) // -chunksize)):
527 each_args = args[i * chunksize : (i + 1) * chunksize]
528 if not each_args:
529 break
530 fut = submit(batch_execute_tasks, each_args)
531 fut.add_done_callback(queue.put)
532
533 # Main loop, wait on tasks to finish, insert new ones
534 while state["waiting"] or state["ready"] or state["running"]:

Callers 1

get_asyncFunction · 0.85

Calls 5

maxFunction · 0.85
minFunction · 0.85
popMethod · 0.80
fFunction · 0.50
addMethod · 0.45

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…