MCPcopy
hub / github.com/dask/dask / test_anom_mean

Function test_anom_mean

dask/tests/test_order.py:1433–1495  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

1431
1432
1433def test_anom_mean():
1434 np = pytest.importorskip("numpy")
1435 xr = pytest.importorskip("xarray")
1436
1437 import dask.array as da
1438
1439 data = da.random.random((200, 1), chunks=(1, -1))
1440
1441 ngroups = 5
1442 arr = xr.DataArray(
1443 data,
1444 dims=["time", "x"],
1445 coords={"day": ("time", np.arange(data.shape[0]) % ngroups)},
1446 )
1447
1448 clim = arr.groupby("day").mean(dim="time")
1449 anom = arr.groupby("day") - clim
1450 anom_mean = anom.mean(dim="time")
1451 graph = collections_to_expr([anom_mean]).optimize().__dask_graph__()
1452 _, dependents = get_deps(graph)
1453 diags, pressure = diagnostics(graph)
1454 # Encoding the "best" ordering for this graph is tricky. When inspecting the
1455 # visualization, one sees that there are small, connected tree-like steps at
1456 # the beginning (tested well below in anom_mean_raw) followed by a
1457 # concat+transpose per group (see ngroups above). This transpose task fans
1458 # out into many (20-30) getitem tasks that are tiny and feed into a
1459 # `mean_chunk` which is the primary reducer in this graph. Therefore we want
1460 # to run those as quickly as possible.
1461 # This is difficult to assert on but the pressure is an ok-ish proxy
1462 assert max(pressure) <= 178
1463 from collections import defaultdict
1464
1465 count_dependents = defaultdict(set)
1466 for k in dict(graph).keys():
1467 count_dependents[len(dependents[k])].add(k)
1468
1469 # array-taker has the most dependents, but it's not what we want to look at
1470 n_splits = sorted(count_dependents)[-2]
1471 # There is a transpose/stack group that is splitting into many tasks
1472 # see https://github.com/dask/dask/pull/10660#discussion_r1420571664
1473 # the name depends on the version of xarray
1474 assert n_splits > 30 # at time of writing 40
1475 transpose_tasks = count_dependents[n_splits]
1476 transpose_metrics = {k: diags[k] for k in transpose_tasks}
1477 assert len(transpose_metrics) == ngroups, {key_split(k) for k in diags}
1478 # This is a pretty tightly connected graph overall and we'll have to hold
1479 # many tasks in memory until this can complete. However, we should ensure
1480 # that we get to the mean_chunks asap while the transposes are released
1481 # quickly.
1482 # If this breaks, I suggest to visually inspect the graph and run the above
1483 # on a single threaded LocalCluster and verify that the progress is indeed
1484 # in five steps (i.e. five groups)
1485 ages_mean_chunks = {k: v.age for k, v in diags.items() if "mean_chunk" in k[0]}
1486 avg_age_mean_chunks = sum(ages_mean_chunks.values()) / len(ages_mean_chunks)
1487 max_age_mean_chunks = max(ages_mean_chunks.values())
1488 ages_transpose = {k: v.age for k, v in transpose_metrics.items()}
1489 assert max_age_mean_chunks > 900
1490 assert avg_age_mean_chunks > 100

Callers

nothing calls this directly

Calls 15

collections_to_exprFunction · 0.90
get_depsFunction · 0.90
diagnosticsFunction · 0.90
maxFunction · 0.85
key_splitFunction · 0.85
sumFunction · 0.70
randomMethod · 0.45
arangeMethod · 0.45
meanMethod · 0.45
groupbyMethod · 0.45
__dask_graph__Method · 0.45
optimizeMethod · 0.45

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…