(s3, engine, s3so, metadata_file)
| 447 | @pytest.mark.filterwarnings("ignore:Dask annotations") |
| 448 | @pytest.mark.parametrize("metadata_file", [True, False]) |
| 449 | def test_parquet(s3, engine, s3so, metadata_file): |
| 450 | dd = pytest.importorskip("dask.dataframe") |
| 451 | pd = pytest.importorskip("pandas") |
| 452 | np = pytest.importorskip("numpy") |
| 453 | |
| 454 | url = f"s3://{test_bucket_name}/test.parquet" |
| 455 | data = pd.DataFrame( |
| 456 | { |
| 457 | "i32": np.arange(1000, dtype=np.int32), |
| 458 | "i64": np.arange(1000, dtype=np.int64), |
| 459 | "f": np.arange(1000, dtype=np.float64), |
| 460 | "bhello": np.random.choice(["hello", "you", "people"], size=1000).astype( |
| 461 | "O" |
| 462 | ), |
| 463 | }, |
| 464 | index=pd.Index(np.arange(1000), name="foo"), |
| 465 | ) |
| 466 | df = dd.from_pandas(data, chunksize=500) |
| 467 | df.to_parquet( |
| 468 | url, engine=engine, storage_options=s3so, write_metadata_file=metadata_file |
| 469 | ) |
| 470 | |
| 471 | files = [f.split("/")[-1] for f in s3.ls(url)] |
| 472 | if metadata_file: |
| 473 | assert "_common_metadata" in files |
| 474 | assert "_metadata" in files |
| 475 | assert "part.0.parquet" in files |
| 476 | |
| 477 | df2 = dd.read_parquet( |
| 478 | url, index="foo", calculate_divisions=True, engine=engine, storage_options=s3so |
| 479 | ) |
| 480 | assert len(df2.divisions) > 1 |
| 481 | |
| 482 | dd.utils.assert_eq(data, df2) |
| 483 | |
| 484 | # Check that `open_file_options` arguments are |
| 485 | # really passed through to fsspec |
| 486 | if fsspec_parquet: |
| 487 | # Passing `open_file_options` kwargs will fail |
| 488 | # if you set an unsupported engine |
| 489 | with pytest.raises(ValueError): |
| 490 | dd.read_parquet( |
| 491 | url, |
| 492 | engine=engine, |
| 493 | storage_options=s3so, |
| 494 | open_file_options={ |
| 495 | "precache_options": {"method": "parquet", "engine": "foo"}, |
| 496 | }, |
| 497 | ).compute() |
| 498 | |
| 499 | # ...but should work fine if you modify the |
| 500 | # maximum block-transfer size (max_block) |
| 501 | dd.read_parquet( |
| 502 | url, |
| 503 | engine=engine, |
| 504 | storage_options=s3so, |
| 505 | open_file_options={ |
| 506 | "precache_options": {"method": "parquet", "max_block": 8_000}, |
nothing calls this directly
no test coverage detected
searching dependent graphs…