()
| 1431 | |
| 1432 | |
| 1433 | def test_anom_mean(): |
| 1434 | np = pytest.importorskip("numpy") |
| 1435 | xr = pytest.importorskip("xarray") |
| 1436 | |
| 1437 | import dask.array as da |
| 1438 | |
| 1439 | data = da.random.random((200, 1), chunks=(1, -1)) |
| 1440 | |
| 1441 | ngroups = 5 |
| 1442 | arr = xr.DataArray( |
| 1443 | data, |
| 1444 | dims=["time", "x"], |
| 1445 | coords={"day": ("time", np.arange(data.shape[0]) % ngroups)}, |
| 1446 | ) |
| 1447 | |
| 1448 | clim = arr.groupby("day").mean(dim="time") |
| 1449 | anom = arr.groupby("day") - clim |
| 1450 | anom_mean = anom.mean(dim="time") |
| 1451 | graph = collections_to_expr([anom_mean]).optimize().__dask_graph__() |
| 1452 | _, dependents = get_deps(graph) |
| 1453 | diags, pressure = diagnostics(graph) |
| 1454 | # Encoding the "best" ordering for this graph is tricky. When inspecting the |
| 1455 | # visualization, one sees that there are small, connected tree-like steps at |
| 1456 | # the beginning (tested well below in anom_mean_raw) followed by a |
| 1457 | # concat+transpose per group (see ngroups above). This transpose task fans |
| 1458 | # out into many (20-30) getitem tasks that are tiny and feed into a |
| 1459 | # `mean_chunk` which is the primary reducer in this graph. Therefore we want |
| 1460 | # to run those as quickly as possible. |
| 1461 | # This is difficult to assert on but the pressure is an ok-ish proxy |
| 1462 | assert max(pressure) <= 178 |
| 1463 | from collections import defaultdict |
| 1464 | |
| 1465 | count_dependents = defaultdict(set) |
| 1466 | for k in dict(graph).keys(): |
| 1467 | count_dependents[len(dependents[k])].add(k) |
| 1468 | |
| 1469 | # array-taker has the most dependents, but it's not what we want to look at |
| 1470 | n_splits = sorted(count_dependents)[-2] |
| 1471 | # There is a transpose/stack group that is splitting into many tasks |
| 1472 | # see https://github.com/dask/dask/pull/10660#discussion_r1420571664 |
| 1473 | # the name depends on the version of xarray |
| 1474 | assert n_splits > 30 # at time of writing 40 |
| 1475 | transpose_tasks = count_dependents[n_splits] |
| 1476 | transpose_metrics = {k: diags[k] for k in transpose_tasks} |
| 1477 | assert len(transpose_metrics) == ngroups, {key_split(k) for k in diags} |
| 1478 | # This is a pretty tightly connected graph overall and we'll have to hold |
| 1479 | # many tasks in memory until this can complete. However, we should ensure |
| 1480 | # that we get to the mean_chunks asap while the transposes are released |
| 1481 | # quickly. |
| 1482 | # If this breaks, I suggest to visually inspect the graph and run the above |
| 1483 | # on a single threaded LocalCluster and verify that the progress is indeed |
| 1484 | # in five steps (i.e. five groups) |
| 1485 | ages_mean_chunks = {k: v.age for k, v in diags.items() if "mean_chunk" in k[0]} |
| 1486 | avg_age_mean_chunks = sum(ages_mean_chunks.values()) / len(ages_mean_chunks) |
| 1487 | max_age_mean_chunks = max(ages_mean_chunks.values()) |
| 1488 | ages_transpose = {k: v.age for k, v in transpose_metrics.items()} |
| 1489 | assert max_age_mean_chunks > 900 |
| 1490 | assert avg_age_mean_chunks > 100 |
nothing calls this directly
no test coverage detected
searching dependent graphs…