Fire off a task to the thread pool
(chunksize)
| 486 | raise ValueError("Found no accessible jobs in dask") |
| 487 | |
| 488 | def fire_tasks(chunksize): |
| 489 | """Fire off a task to the thread pool""" |
| 490 | # Determine chunksize and/or number of tasks to submit |
| 491 | nready = len(state["ready"]) |
| 492 | if chunksize == -1: |
| 493 | ntasks = nready |
| 494 | chunksize = -(ntasks // -num_workers) |
| 495 | else: |
| 496 | used_workers = -(len(state["running"]) // -chunksize) |
| 497 | avail_workers = max(num_workers - used_workers, 0) |
| 498 | ntasks = min(nready, chunksize * avail_workers) |
| 499 | |
| 500 | # Prep all ready tasks for submission |
| 501 | args = [] |
| 502 | for _ in range(ntasks): |
| 503 | # Get the next task to compute (most recently added) |
| 504 | key = state["ready"].pop() |
| 505 | # Notify task is running |
| 506 | state["running"].add(key) |
| 507 | for f in pretask_cbs: |
| 508 | f(key, dsk, state) |
| 509 | |
| 510 | # Prep args to send |
| 511 | data = { |
| 512 | dep: state["cache"][dep] for dep in state["dependencies"][key] |
| 513 | } |
| 514 | args.append( |
| 515 | ( |
| 516 | key, |
| 517 | dumps((dsk[key], data)), |
| 518 | dumps, |
| 519 | loads, |
| 520 | get_id, |
| 521 | pack_exception, |
| 522 | ) |
| 523 | ) |
| 524 | |
| 525 | # Batch submit |
| 526 | for i in range(-(len(args) // -chunksize)): |
| 527 | each_args = args[i * chunksize : (i + 1) * chunksize] |
| 528 | if not each_args: |
| 529 | break |
| 530 | fut = submit(batch_execute_tasks, each_args) |
| 531 | fut.add_done_callback(queue.put) |
| 532 | |
| 533 | # Main loop, wait on tasks to finish, insert new ones |
| 534 | while state["waiting"] or state["ready"] or state["running"]: |