(
ray_start_regular_shared, tmp_path, target_max_block_size_infinite_or_default
)
| 21 | |
| 22 | |
| 23 | def test_csv_read( |
| 24 | ray_start_regular_shared, tmp_path, target_max_block_size_infinite_or_default |
| 25 | ): |
| 26 | # Single file. |
| 27 | df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]}) |
| 28 | path1 = os.path.join(tmp_path, "test1.csv") |
| 29 | df1.to_csv(path1, index=False) |
| 30 | ds = ray.data.read_csv(path1, partitioning=None) |
| 31 | dsdf = ds.to_pandas().sort_values(by=["one", "two"]).reset_index(drop=True) |
| 32 | pd.testing.assert_frame_equal(df1.astype(dsdf.dtypes.to_dict()), dsdf) |
| 33 | # Test metadata ops. |
| 34 | assert ds.count() == 3 |
| 35 | assert ds.input_files() == [_unwrap_protocol(path1)] |
| 36 | assert ds.schema() == Schema(pa.schema([("one", pa.int64()), ("two", pa.string())])) |
| 37 | |
| 38 | # Two files, override_num_blocks=2. |
| 39 | df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]}) |
| 40 | path2 = os.path.join(tmp_path, "test2.csv") |
| 41 | df2.to_csv(path2, index=False) |
| 42 | ds = ray.data.read_csv([path1, path2], override_num_blocks=2, partitioning=None) |
| 43 | dsdf = ds.to_pandas().sort_values(by=["one", "two"]).reset_index(drop=True) |
| 44 | df = pd.concat([df1, df2], ignore_index=True) |
| 45 | pd.testing.assert_frame_equal(df.astype(dsdf.dtypes.to_dict()), dsdf) |
| 46 | # Test metadata ops. |
| 47 | for entry in ds._execute().blocks: |
| 48 | assert ( |
| 49 | # pyrefly: ignore[no-matching-overload] |
| 50 | BlockAccessor.for_block(ray.get(entry.ref)).size_bytes() |
| 51 | == entry.metadata.size_bytes |
| 52 | ) |
| 53 | |
| 54 | # Three files, override_num_blocks=2. |
| 55 | df3 = pd.DataFrame({"one": [7, 8, 9], "two": ["h", "i", "j"]}) |
| 56 | path3 = os.path.join(tmp_path, "test3.csv") |
| 57 | df3.to_csv(path3, index=False) |
| 58 | ds = ray.data.read_csv( |
| 59 | [path1, path2, path3], |
| 60 | override_num_blocks=2, |
| 61 | partitioning=None, |
| 62 | ) |
| 63 | df = pd.concat([df1, df2, df3], ignore_index=True) |
| 64 | dsdf = ds.to_pandas().sort_values(by=["one", "two"]).reset_index(drop=True) |
| 65 | pd.testing.assert_frame_equal(df.astype(dsdf.dtypes.to_dict()), dsdf) |
| 66 | |
| 67 | |
| 68 | def test_csv_write( |
nothing calls this directly
no test coverage detected
searching dependent graphs…