Test that predicate pushdown works correctly with different join types. Filters on single-side predicates should push past the join when appropriate: - Inner join: can push to either side - Left outer: can push to left (preserved) side only - Right outer: can push to right (preserve
(
ray_start_regular_shared_2_cpus, join_type, filter_side, should_push
)
| 720 | ids=["inner_left", "inner_right", "left_outer_left", "left_outer_right"], |
| 721 | ) |
| 722 | def test_join_with_predicate_pushdown( |
| 723 | ray_start_regular_shared_2_cpus, join_type, filter_side, should_push |
| 724 | ): |
| 725 | """Test that predicate pushdown works correctly with different join types. |
| 726 | |
| 727 | Filters on single-side predicates should push past the join when appropriate: |
| 728 | - Inner join: can push to either side |
| 729 | - Left outer: can push to left (preserved) side only |
| 730 | - Right outer: can push to right (preserved) side only |
| 731 | """ |
| 732 | from ray.data._internal.logical.optimizers import LogicalOptimizer |
| 733 | from ray.data._internal.util import MiB |
| 734 | from ray.data.expressions import col |
| 735 | |
| 736 | DataContext.get_current().target_max_block_size = 1 * MiB |
| 737 | |
| 738 | # Create datasets directly without map to allow filter pushdown through join |
| 739 | # Both have ids 0-31 with different value columns |
| 740 | left_data = [{"id": i, "left_val": i * 10} for i in range(32)] |
| 741 | right_data = [{"id": i, "right_val": i * 100} for i in range(32)] |
| 742 | |
| 743 | left_ds = ray.data.from_items(left_data) |
| 744 | right_ds = ray.data.from_items(right_data) |
| 745 | |
| 746 | # Join then filter |
| 747 | joined = left_ds.join( |
| 748 | right_ds, |
| 749 | join_type=join_type, |
| 750 | num_partitions=4, |
| 751 | on=("id",), |
| 752 | aggregator_ray_remote_args={"num_cpus": 0.01}, |
| 753 | ) |
| 754 | |
| 755 | # Filter on column from specified side |
| 756 | if filter_side == "left": |
| 757 | filtered_ds = joined.filter(expr=col("left_val") < 100) |
| 758 | else: |
| 759 | filtered_ds = joined.filter(expr=col("right_val") < 1000) |
| 760 | |
| 761 | left_pd = left_ds.to_pandas() |
| 762 | right_pd = right_ds.to_pandas() |
| 763 | |
| 764 | # Compute expected join result |
| 765 | if join_type == "inner": |
| 766 | expected_pd = left_pd.merge(right_pd, on="id", how="inner") |
| 767 | elif join_type == "left_outer": |
| 768 | expected_pd = left_pd.merge(right_pd, on="id", how="left") |
| 769 | else: |
| 770 | raise ValueError(f"Unsupported join type for this test: {join_type}") |
| 771 | |
| 772 | # Apply filter (must match what we filtered in Ray Data) |
| 773 | if filter_side == "left": |
| 774 | # For left-side filter, use notna() to include NaN rows from outer joins |
| 775 | expected_pd = expected_pd[expected_pd["left_val"] < 100] |
| 776 | else: |
| 777 | # For right-side filter in outer joins, NaN values fail the comparison |
| 778 | # and are filtered out (matching Ray Data behavior) |
| 779 | expected_pd = expected_pd[expected_pd["right_val"] < 1000] |
nothing calls this directly
no test coverage detected
searching dependent graphs…