(ray_start_regular_shared, restore_data_context)
| 553 | |
| 554 | |
| 555 | def test_union(ray_start_regular_shared, restore_data_context): |
| 556 | # Set aggregator CPU to 0 to avoid deadlock in resource-constrained test env. |
| 557 | # Without this, the shuffle task (1 CPU) + aggregator actor (~0.25 CPU) would |
| 558 | # exceed the 1 CPU available in the test cluster, causing scheduler deadlock. |
| 559 | restore_data_context.hash_aggregate_operator_actor_num_cpus_override = 0 |
| 560 | |
| 561 | ds = ray.data.range(20, override_num_blocks=10).materialize() |
| 562 | |
| 563 | # Test lazy union. |
| 564 | ds = ds.union(ds, ds, ds, ds) |
| 565 | assert ds._logical_plan.initial_num_blocks() == 50 |
| 566 | assert ds.count() == 100 |
| 567 | assert ds.sum() == 950 |
| 568 | |
| 569 | ds = ds.union(ds) |
| 570 | assert ds.count() == 200 |
| 571 | assert ds.sum() == (950 * 2) |
| 572 | |
| 573 | # Test materialized union. |
| 574 | ds2 = ray.data.from_items([1, 2, 3, 4, 5]) |
| 575 | assert ds2.count() == 5 |
| 576 | ds2 = ds2.union(ds2) |
| 577 | assert ds2.count() == 10 |
| 578 | ds2 = ds2.union(ds) |
| 579 | assert ds2.count() == 210 |
| 580 | |
| 581 | |
| 582 | def test_block_builder_for_block(ray_start_regular_shared): |
nothing calls this directly
no test coverage detected
searching dependent graphs…