(
ray_start_regular_shared, target_max_block_size_infinite_or_default
)
| 946 | |
| 947 | |
| 948 | def test_warn_large_udfs( |
| 949 | ray_start_regular_shared, target_max_block_size_infinite_or_default |
| 950 | ): |
| 951 | driver = """ |
| 952 | import ray |
| 953 | import numpy as np |
| 954 | from ray.data._internal.execution.operators.map_operator import MapOperator |
| 955 | |
| 956 | large_object = np.zeros(MapOperator.MAP_UDF_WARN_SIZE_THRESHOLD + 1, dtype=np.int8) |
| 957 | |
| 958 | class LargeUDF: |
| 959 | def __init__(self): |
| 960 | self.data = large_object |
| 961 | |
| 962 | def __call__(self, batch): |
| 963 | return batch |
| 964 | |
| 965 | ds = ray.data.range(1) |
| 966 | ds = ds.map_batches(LargeUDF, concurrency=1) |
| 967 | assert ds.take_all() == [{"id": 0}] |
| 968 | """ |
| 969 | output = run_string_as_driver(driver) |
| 970 | assert "The UDF of operator MapBatches(LargeUDF) is too large" in output |
| 971 | |
| 972 | |
| 973 | # NOTE: All tests above share a Ray cluster, while the tests below do not. These |
nothing calls this directly
no test coverage detected
searching dependent graphs…