MCPcopy
hub / github.com/ray-project/ray / test_join_with_predicate_pushdown

Function test_join_with_predicate_pushdown

python/ray/data/tests/test_join.py:722–810  ·  view source on GitHub ↗

Test that predicate pushdown works correctly with different join types. Filters on single-side predicates should push past the join when appropriate: - Inner join: can push to either side - Left outer: can push to left (preserved) side only - Right outer: can push to right (preserve

(
    ray_start_regular_shared_2_cpus, join_type, filter_side, should_push
)

Source from the content-addressed store, hash-verified

720 ids=["inner_left", "inner_right", "left_outer_left", "left_outer_right"],
721)
722def test_join_with_predicate_pushdown(
723 ray_start_regular_shared_2_cpus, join_type, filter_side, should_push
724):
725 """Test that predicate pushdown works correctly with different join types.
726
727 Filters on single-side predicates should push past the join when appropriate:
728 - Inner join: can push to either side
729 - Left outer: can push to left (preserved) side only
730 - Right outer: can push to right (preserved) side only
731 """
732 from ray.data._internal.logical.optimizers import LogicalOptimizer
733 from ray.data._internal.util import MiB
734 from ray.data.expressions import col
735
736 DataContext.get_current().target_max_block_size = 1 * MiB
737
738 # Create datasets directly without map to allow filter pushdown through join
739 # Both have ids 0-31 with different value columns
740 left_data = [{"id": i, "left_val": i * 10} for i in range(32)]
741 right_data = [{"id": i, "right_val": i * 100} for i in range(32)]
742
743 left_ds = ray.data.from_items(left_data)
744 right_ds = ray.data.from_items(right_data)
745
746 # Join then filter
747 joined = left_ds.join(
748 right_ds,
749 join_type=join_type,
750 num_partitions=4,
751 on=("id",),
752 aggregator_ray_remote_args={"num_cpus": 0.01},
753 )
754
755 # Filter on column from specified side
756 if filter_side == "left":
757 filtered_ds = joined.filter(expr=col("left_val") < 100)
758 else:
759 filtered_ds = joined.filter(expr=col("right_val") < 1000)
760
761 left_pd = left_ds.to_pandas()
762 right_pd = right_ds.to_pandas()
763
764 # Compute expected join result
765 if join_type == "inner":
766 expected_pd = left_pd.merge(right_pd, on="id", how="inner")
767 elif join_type == "left_outer":
768 expected_pd = left_pd.merge(right_pd, on="id", how="left")
769 else:
770 raise ValueError(f"Unsupported join type for this test: {join_type}")
771
772 # Apply filter (must match what we filtered in Ray Data)
773 if filter_side == "left":
774 # For left-side filter, use notna() to include NaN rows from outer joins
775 expected_pd = expected_pd[expected_pd["left_val"] < 100]
776 else:
777 # For right-side filter in outer joins, NaN values fail the comparison
778 # and are filtered out (matching Ray Data behavior)
779 expected_pd = expected_pd[expected_pd["right_val"] < 1000]

Callers

nothing calls this directly

Calls 11

colFunction · 0.90
rows_sameFunction · 0.90
LogicalOptimizerClass · 0.90
rangeFunction · 0.70
get_currentMethod · 0.45
joinMethod · 0.45
filterMethod · 0.45
to_pandasMethod · 0.45
mergeMethod · 0.45
optimizeMethod · 0.45
findMethod · 0.45

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…