MCPcopy
hub / github.com/uber/petastorm / make_spark_converter

Function make_spark_converter

petastorm/spark/spark_dataset_converter.py:664–736  ·  view source on GitHub ↗

Convert a spark dataframe into a :class:`SparkDatasetConverter` object. It will materialize a spark dataframe to the directory specified by spark conf 'petastorm.spark.converter.parentCacheDirUrl'. The dataframe will be materialized in parquet format, and we can specify `parquet_row_

(
        df,
        parquet_row_group_size_bytes=DEFAULT_ROW_GROUP_SIZE_BYTES,
        compression_codec=None,
        dtype='float32'
)

Source from the content-addressed store, hash-verified

662
663
664def make_spark_converter(
665 df,
666 parquet_row_group_size_bytes=DEFAULT_ROW_GROUP_SIZE_BYTES,
667 compression_codec=None,
668 dtype='float32'
669):
670 """Convert a spark dataframe into a :class:`SparkDatasetConverter` object.
671 It will materialize a spark dataframe to the directory specified by
672 spark conf 'petastorm.spark.converter.parentCacheDirUrl'.
673 The dataframe will be materialized in parquet format, and we can specify
674 `parquet_row_group_size_bytes` and `compression_codec` for the parquet
675 format. See params documentation for details.
676
677 The returned `SparkDatasetConverter` object will hold the materialized
678 dataframe, and can be used to make one or more tensorflow datasets or
679 torch dataloaders.
680
681 We can explicitly delete the materialized dataframe data, see
682 `SparkDatasetConverter.delete`, and when the spark application exit,
683 it will try best effort to delete the materialized dataframe data.
684
685 :param df: The :class:`pyspark.sql.DataFrame` object to be converted,
686 or a string of path pointing to the directory that stores the dataframe data
687 as parquet format, on databricks runtime, the path must be a dbfs
688 fuse path like 'file:/dbfs/xxx' or a dbfs path like 'dbfs:/xxx'.
689 :param parquet_row_group_size_bytes: An int denoting the number of bytes
690 in a parquet row group when materializing the dataframe.
691 :param compression_codec: Specify compression codec.
692 It can be one of 'uncompressed', 'bzip2', 'gzip', 'lz4', 'snappy', 'deflate'.
693 Default ``None``. If ``None``, it will leave the data uncompressed.
694 :param dtype: ``None``, 'float32' or 'float64', specifying the precision of the floating-point
695 elements in the output dataset. Integer types will remain unchanged. If ``None``, all types
696 will remain unchanged. Default 'float32'.
697
698 :return: a :class:`SparkDatasetConverter` object that holds the
699 materialized dataframe and can be used to make one or more tensorflow
700 datasets or torch dataloaders.
701 """
702
703 parent_cache_dir_url = _get_parent_cache_dir_url()
704
705 if isinstance(df, str):
706 dataset_dir_url = df
707 if 'DATABRICKS_RUNTIME_VERSION' in os.environ:
708 dataset_dir_url = _normalize_databricks_dbfs_url(
709 dataset_dir_url,
710 "On databricks runtime, if `df` argument is a string, it must be a dbfs "
711 "fuse path like 'file:/dbfs/xxx' or a dbfs path like 'dbfs:/xxx'."
712 )
713 else:
714 # TODO: Improve default behavior to be automatically choosing the best way.
715 compression_codec = compression_codec or "uncompressed"
716
717 if compression_codec.lower() not in \
718 ['uncompressed', 'bzip2', 'gzip', 'lz4', 'snappy', 'deflate']:
719 raise RuntimeError(
720 "compression_codec should be None or one of the following values: "
721 "'uncompressed', 'bzip2', 'gzip', 'lz4', 'snappy', 'deflate'")

Callers 15

runFunction · 0.90
runFunction · 0.90
test_tf_autographFunction · 0.90
test_primitiveFunction · 0.90
test_array_fieldFunction · 0.90
test_deleteFunction · 0.90
test_compressionFunction · 0.90
test_df_cachingFunction · 0.90
test_pickling_remotelyFunction · 0.90

Tested by 15

test_tf_autographFunction · 0.72
test_primitiveFunction · 0.72
test_array_fieldFunction · 0.72
test_deleteFunction · 0.72
test_compressionFunction · 0.72
test_df_cachingFunction · 0.72
test_pickling_remotelyFunction · 0.72
test_dtypeFunction · 0.72
test_arrayFunction · 0.72

Used in the wild real call sites across dependent graphs

searching dependent graphs…