(
ray_start_regular_shared_2_cpus,
ds_format,
num_parts,
configure_shuffle_method,
disable_fallback_to_object_extension,
)
| 30 | @pytest.mark.parametrize("num_parts", [1, 30]) |
| 31 | @pytest.mark.parametrize("ds_format", ["arrow", "pandas"]) |
| 32 | def test_global_tabular_sum( |
| 33 | ray_start_regular_shared_2_cpus, |
| 34 | ds_format, |
| 35 | num_parts, |
| 36 | configure_shuffle_method, |
| 37 | disable_fallback_to_object_extension, |
| 38 | ): |
| 39 | seed = int(time.time()) |
| 40 | print(f"Seeding RNG for test_global_arrow_sum with: {seed}") |
| 41 | random.seed(seed) |
| 42 | xs = list(range(100)) |
| 43 | random.shuffle(xs) |
| 44 | |
| 45 | def _to_pandas(ds): |
| 46 | return ds.map_batches(lambda x: x, batch_size=None, batch_format="pandas") |
| 47 | |
| 48 | # Test built-in global sum aggregation |
| 49 | ds = ray.data.from_items([{"A": x} for x in xs]).repartition(num_parts) |
| 50 | if ds_format == "pandas": |
| 51 | ds = _to_pandas(ds) |
| 52 | assert ds.sum("A") == 4950 |
| 53 | |
| 54 | # Test empty dataset |
| 55 | ds = ray.data.range(10) |
| 56 | if ds_format == "pandas": |
| 57 | ds = _to_pandas(ds) |
| 58 | assert ds.filter(lambda r: r["id"] > 10).sum("id") is None |
| 59 | |
| 60 | # Test built-in global sum aggregation with nans |
| 61 | nan_ds = ray.data.from_items([{"A": x} for x in xs] + [{"A": None}]).repartition( |
| 62 | num_parts |
| 63 | ) |
| 64 | if ds_format == "pandas": |
| 65 | nan_ds = _to_pandas(nan_ds) |
| 66 | assert nan_ds.sum("A") == 4950 |
| 67 | # Test ignore_nulls=False |
| 68 | assert pd.isnull(nan_ds.sum("A", ignore_nulls=False)) |
| 69 | # Test all nans |
| 70 | nan_ds = ray.data.from_items([{"A": None}] * len(xs)).repartition(num_parts) |
| 71 | if ds_format == "pandas": |
| 72 | nan_ds = _to_pandas(nan_ds) |
| 73 | assert nan_ds.sum("A") is None |
| 74 | assert pd.isnull(nan_ds.sum("A", ignore_nulls=False)) |
| 75 | |
| 76 | |
| 77 | def test_random_block_order_schema( |
nothing calls this directly
no test coverage detected
searching dependent graphs…