Convert a spark dataframe into a :class:`SparkDatasetConverter` object. It will materialize a spark dataframe to the directory specified by spark conf 'petastorm.spark.converter.parentCacheDirUrl'. The dataframe will be materialized in parquet format, and we can specify `parquet_row_
(
df,
parquet_row_group_size_bytes=DEFAULT_ROW_GROUP_SIZE_BYTES,
compression_codec=None,
dtype='float32'
)
| 662 | |
| 663 | |
| 664 | def make_spark_converter( |
| 665 | df, |
| 666 | parquet_row_group_size_bytes=DEFAULT_ROW_GROUP_SIZE_BYTES, |
| 667 | compression_codec=None, |
| 668 | dtype='float32' |
| 669 | ): |
| 670 | """Convert a spark dataframe into a :class:`SparkDatasetConverter` object. |
| 671 | It will materialize a spark dataframe to the directory specified by |
| 672 | spark conf 'petastorm.spark.converter.parentCacheDirUrl'. |
| 673 | The dataframe will be materialized in parquet format, and we can specify |
| 674 | `parquet_row_group_size_bytes` and `compression_codec` for the parquet |
| 675 | format. See params documentation for details. |
| 676 | |
| 677 | The returned `SparkDatasetConverter` object will hold the materialized |
| 678 | dataframe, and can be used to make one or more tensorflow datasets or |
| 679 | torch dataloaders. |
| 680 | |
| 681 | We can explicitly delete the materialized dataframe data, see |
| 682 | `SparkDatasetConverter.delete`, and when the spark application exit, |
| 683 | it will try best effort to delete the materialized dataframe data. |
| 684 | |
| 685 | :param df: The :class:`pyspark.sql.DataFrame` object to be converted, |
| 686 | or a string of path pointing to the directory that stores the dataframe data |
| 687 | as parquet format, on databricks runtime, the path must be a dbfs |
| 688 | fuse path like 'file:/dbfs/xxx' or a dbfs path like 'dbfs:/xxx'. |
| 689 | :param parquet_row_group_size_bytes: An int denoting the number of bytes |
| 690 | in a parquet row group when materializing the dataframe. |
| 691 | :param compression_codec: Specify compression codec. |
| 692 | It can be one of 'uncompressed', 'bzip2', 'gzip', 'lz4', 'snappy', 'deflate'. |
| 693 | Default ``None``. If ``None``, it will leave the data uncompressed. |
| 694 | :param dtype: ``None``, 'float32' or 'float64', specifying the precision of the floating-point |
| 695 | elements in the output dataset. Integer types will remain unchanged. If ``None``, all types |
| 696 | will remain unchanged. Default 'float32'. |
| 697 | |
| 698 | :return: a :class:`SparkDatasetConverter` object that holds the |
| 699 | materialized dataframe and can be used to make one or more tensorflow |
| 700 | datasets or torch dataloaders. |
| 701 | """ |
| 702 | |
| 703 | parent_cache_dir_url = _get_parent_cache_dir_url() |
| 704 | |
| 705 | if isinstance(df, str): |
| 706 | dataset_dir_url = df |
| 707 | if 'DATABRICKS_RUNTIME_VERSION' in os.environ: |
| 708 | dataset_dir_url = _normalize_databricks_dbfs_url( |
| 709 | dataset_dir_url, |
| 710 | "On databricks runtime, if `df` argument is a string, it must be a dbfs " |
| 711 | "fuse path like 'file:/dbfs/xxx' or a dbfs path like 'dbfs:/xxx'." |
| 712 | ) |
| 713 | else: |
| 714 | # TODO: Improve default behavior to be automatically choosing the best way. |
| 715 | compression_codec = compression_codec or "uncompressed" |
| 716 | |
| 717 | if compression_codec.lower() not in \ |
| 718 | ['uncompressed', 'bzip2', 'gzip', 'lz4', 'snappy', 'deflate']: |
| 719 | raise RuntimeError( |
| 720 | "compression_codec should be None or one of the following values: " |
| 721 | "'uncompressed', 'bzip2', 'gzip', 'lz4', 'snappy', 'deflate'") |
searching dependent graphs…