MCPcopy
hub / github.com/ray-project/ray / test_csv_read

Function test_csv_read

python/ray/data/tests/datasource/test_csv.py:23–65  ·  view source on GitHub ↗
(
    ray_start_regular_shared, tmp_path, target_max_block_size_infinite_or_default
)

Source from the content-addressed store, hash-verified

21
22
23def test_csv_read(
24 ray_start_regular_shared, tmp_path, target_max_block_size_infinite_or_default
25):
26 # Single file.
27 df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]})
28 path1 = os.path.join(tmp_path, "test1.csv")
29 df1.to_csv(path1, index=False)
30 ds = ray.data.read_csv(path1, partitioning=None)
31 dsdf = ds.to_pandas().sort_values(by=["one", "two"]).reset_index(drop=True)
32 pd.testing.assert_frame_equal(df1.astype(dsdf.dtypes.to_dict()), dsdf)
33 # Test metadata ops.
34 assert ds.count() == 3
35 assert ds.input_files() == [_unwrap_protocol(path1)]
36 assert ds.schema() == Schema(pa.schema([("one", pa.int64()), ("two", pa.string())]))
37
38 # Two files, override_num_blocks=2.
39 df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]})
40 path2 = os.path.join(tmp_path, "test2.csv")
41 df2.to_csv(path2, index=False)
42 ds = ray.data.read_csv([path1, path2], override_num_blocks=2, partitioning=None)
43 dsdf = ds.to_pandas().sort_values(by=["one", "two"]).reset_index(drop=True)
44 df = pd.concat([df1, df2], ignore_index=True)
45 pd.testing.assert_frame_equal(df.astype(dsdf.dtypes.to_dict()), dsdf)
46 # Test metadata ops.
47 for entry in ds._execute().blocks:
48 assert (
49 # pyrefly: ignore[no-matching-overload]
50 BlockAccessor.for_block(ray.get(entry.ref)).size_bytes()
51 == entry.metadata.size_bytes
52 )
53
54 # Three files, override_num_blocks=2.
55 df3 = pd.DataFrame({"one": [7, 8, 9], "two": ["h", "i", "j"]})
56 path3 = os.path.join(tmp_path, "test3.csv")
57 df3.to_csv(path3, index=False)
58 ds = ray.data.read_csv(
59 [path1, path2, path3],
60 override_num_blocks=2,
61 partitioning=None,
62 )
63 df = pd.concat([df1, df2, df3], ignore_index=True)
64 dsdf = ds.to_pandas().sort_values(by=["one", "two"]).reset_index(drop=True)
65 pd.testing.assert_frame_equal(df.astype(dsdf.dtypes.to_dict()), dsdf)
66
67
68def test_csv_write(

Callers

nothing calls this directly

Calls 14

_unwrap_protocolFunction · 0.90
SchemaClass · 0.90
astypeMethod · 0.80
getMethod · 0.65
joinMethod · 0.45
to_pandasMethod · 0.45
to_dictMethod · 0.45
countMethod · 0.45
input_filesMethod · 0.45
schemaMethod · 0.45
concatMethod · 0.45
_executeMethod · 0.45

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…