Ensure that all chunks of all input collections have been computed before computing the dependents of any of the chunks. The following example creates a dask array ``u`` that, when used in a computation, will only proceed when all chunks of the array ``x`` have been computed, but ot
(
*collections,
split_every: float | Literal[False] | None = None,
)
| 476 | |
| 477 | |
| 478 | def wait_on( |
| 479 | *collections, |
| 480 | split_every: float | Literal[False] | None = None, |
| 481 | ): |
| 482 | """Ensure that all chunks of all input collections have been computed before |
| 483 | computing the dependents of any of the chunks. |
| 484 | |
| 485 | The following example creates a dask array ``u`` that, when used in a computation, |
| 486 | will only proceed when all chunks of the array ``x`` have been computed, but |
| 487 | otherwise matches ``x``: |
| 488 | |
| 489 | >>> import dask.array as da |
| 490 | >>> x = da.ones(10, chunks=5) |
| 491 | >>> u = wait_on(x) |
| 492 | |
| 493 | The following example will create two arrays ``u`` and ``v`` that, when used in a |
| 494 | computation, will only proceed when all chunks of the arrays ``x`` and ``y`` have |
| 495 | been computed but otherwise match ``x`` and ``y``: |
| 496 | |
| 497 | >>> x = da.ones(10, chunks=5) |
| 498 | >>> y = da.zeros(10, chunks=5) |
| 499 | >>> u, v = wait_on(x, y) |
| 500 | |
| 501 | Parameters |
| 502 | ---------- |
| 503 | collections |
| 504 | Zero or more Dask collections or nested structures of Dask collections |
| 505 | split_every |
| 506 | See :func:`checkpoint` |
| 507 | |
| 508 | Returns |
| 509 | ------- |
| 510 | Same as ``collections`` |
| 511 | Dask collection of the same type as the input, which computes to the same value, |
| 512 | or a nested structure equivalent to the input where the original collections |
| 513 | have been replaced. |
| 514 | The keys of the regenerated nodes of the new collections will be different from |
| 515 | the original ones, so that they can be used within the same graph. |
| 516 | """ |
| 517 | blocker = checkpoint(*collections, split_every=split_every) |
| 518 | |
| 519 | def block_one(coll): |
| 520 | tok = tokenize(coll, blocker) |
| 521 | dsks = [] |
| 522 | rename = {} |
| 523 | for prev_name in get_collection_names(coll): |
| 524 | new_name = "wait_on-" + tokenize(prev_name, tok) |
| 525 | rename[prev_name] = new_name |
| 526 | layer = _build_map_layer( |
| 527 | chunks.bind, prev_name, new_name, coll, dependencies=(blocker,) |
| 528 | ) |
| 529 | dsks.append( |
| 530 | HighLevelGraph.from_collections( |
| 531 | new_name, layer, dependencies=(coll, blocker) |
| 532 | ) |
| 533 | ) |
| 534 | dsk = HighLevelGraph.merge(*dsks) |
| 535 | rebuild, args = coll.__dask_postpersist__() |
searching dependent graphs…