MCPcopy
hub / github.com/dask/dask / wait_on

Function wait_on

dask/graph_manipulation.py:478–540  ·  view source on GitHub ↗

Ensure that all chunks of all input collections have been computed before computing the dependents of any of the chunks. The following example creates a dask array ``u`` that, when used in a computation, will only proceed when all chunks of the array ``x`` have been computed, but ot

(
    *collections,
    split_every: float | Literal[False] | None = None,
)

Source from the content-addressed store, hash-verified

476
477
478def wait_on(
479 *collections,
480 split_every: float | Literal[False] | None = None,
481):
482 """Ensure that all chunks of all input collections have been computed before
483 computing the dependents of any of the chunks.
484
485 The following example creates a dask array ``u`` that, when used in a computation,
486 will only proceed when all chunks of the array ``x`` have been computed, but
487 otherwise matches ``x``:
488
489 >>> import dask.array as da
490 >>> x = da.ones(10, chunks=5)
491 >>> u = wait_on(x)
492
493 The following example will create two arrays ``u`` and ``v`` that, when used in a
494 computation, will only proceed when all chunks of the arrays ``x`` and ``y`` have
495 been computed but otherwise match ``x`` and ``y``:
496
497 >>> x = da.ones(10, chunks=5)
498 >>> y = da.zeros(10, chunks=5)
499 >>> u, v = wait_on(x, y)
500
501 Parameters
502 ----------
503 collections
504 Zero or more Dask collections or nested structures of Dask collections
505 split_every
506 See :func:`checkpoint`
507
508 Returns
509 -------
510 Same as ``collections``
511 Dask collection of the same type as the input, which computes to the same value,
512 or a nested structure equivalent to the input where the original collections
513 have been replaced.
514 The keys of the regenerated nodes of the new collections will be different from
515 the original ones, so that they can be used within the same graph.
516 """
517 blocker = checkpoint(*collections, split_every=split_every)
518
519 def block_one(coll):
520 tok = tokenize(coll, blocker)
521 dsks = []
522 rename = {}
523 for prev_name in get_collection_names(coll):
524 new_name = "wait_on-" + tokenize(prev_name, tok)
525 rename[prev_name] = new_name
526 layer = _build_map_layer(
527 chunks.bind, prev_name, new_name, coll, dependencies=(blocker,)
528 )
529 dsks.append(
530 HighLevelGraph.from_collections(
531 new_name, layer, dependencies=(coll, blocker)
532 )
533 )
534 dsk = HighLevelGraph.merge(*dsks)
535 rebuild, args = coll.__dask_postpersist__()

Callers 3

test_wait_on_oneFunction · 0.90
test_wait_on_manyFunction · 0.90
test_split_everyFunction · 0.90

Calls 4

unpack_collectionsFunction · 0.90
checkpointFunction · 0.85
repackFunction · 0.85
block_oneFunction · 0.85

Tested by 3

test_wait_on_oneFunction · 0.72
test_wait_on_manyFunction · 0.72
test_split_everyFunction · 0.72

Used in the wild real call sites across dependent graphs

searching dependent graphs…