| 1095 | ) |
| 1096 | @pytest.mark.parametrize("split_out", [1, 2]) |
| 1097 | def test_dataframe_aggregations_multilevel( |
| 1098 | xfail, grouper_id, grouper, agg_func, split_out |
| 1099 | ): |
| 1100 | if agg_func in ("cov", "corr") and split_out == 1 and grouper_id == 4: |
| 1101 | xfail("Unknown issue") |
| 1102 | elif agg_func in ("cov", "corr") and split_out > 1: |
| 1103 | xfail("https://github.com/dask/dask/issues/9509") |
| 1104 | |
| 1105 | sort = split_out == 1 # Don't sort for split_out > 1 |
| 1106 | |
| 1107 | def call(g, m, **kwargs): |
| 1108 | return getattr(g, m)(**kwargs) |
| 1109 | |
| 1110 | pdf = pd.DataFrame( |
| 1111 | { |
| 1112 | "a": [1, 2, 6, 4, 4, 6, 4, 3, 7] * 10, |
| 1113 | "b": [4, 2, 7, 3, 3, 1, 1, 1, 2] * 10, |
| 1114 | "d": [0, 1, 2, 3, 4, 5, 6, 7, 8] * 10, |
| 1115 | "c": [0, 1, 2, 3, 4, 5, 6, 7, 8] * 10, |
| 1116 | }, |
| 1117 | columns=["c", "b", "a", "d"], |
| 1118 | ) |
| 1119 | |
| 1120 | ddf = dd.from_pandas(pdf, npartitions=10) |
| 1121 | |
| 1122 | # covariance only works with N+1 columns |
| 1123 | if agg_func not in ("cov", "corr"): |
| 1124 | assert_eq( |
| 1125 | call(pdf.groupby(grouper(pdf), sort=sort)["c"], agg_func), |
| 1126 | call( |
| 1127 | ddf.groupby(grouper(ddf), sort=sort)["c"], |
| 1128 | agg_func, |
| 1129 | split_out=split_out, |
| 1130 | split_every=2, |
| 1131 | ), |
| 1132 | ) |
| 1133 | |
| 1134 | # not supported by pandas |
| 1135 | if agg_func != "nunique": |
| 1136 | assert_eq( |
| 1137 | call(pdf.groupby(grouper(pdf), sort=sort)[["c", "d"]], agg_func), |
| 1138 | call( |
| 1139 | ddf.groupby(grouper(ddf), sort=sort)[["c", "d"]], |
| 1140 | agg_func, |
| 1141 | split_out=split_out, |
| 1142 | split_every=2, |
| 1143 | ), |
| 1144 | ) |
| 1145 | |
| 1146 | if agg_func in ("cov", "corr"): |
| 1147 | # there are sorting issues between pandas and chunk cov w/dask |
| 1148 | df = call(pdf.groupby(grouper(pdf), sort=sort), agg_func).sort_index() |
| 1149 | cols = sorted(df.columns) |
| 1150 | df = df[cols] |
| 1151 | dddf = call( |
| 1152 | ddf.groupby(grouper(ddf), sort=sort), |
| 1153 | agg_func, |
| 1154 | split_out=split_out, |