| 14 | pass |
| 15 | |
| 16 | def read(self, path): |
| 17 | logging.debug("Reading from %s using parquet format" % path) |
| 18 | metadata = pyarrow.parquet.read_metadata(path) |
| 19 | metadata = metadata.schema.to_arrow_schema().metadata |
| 20 | |
| 21 | # As parquet data are tabularized, we assume the dim of ndarray is 2. |
| 22 | # If not, it should be explictly specified in the file as metadata. |
| 23 | if metadata: |
| 24 | shape = metadata.get(b"shape", None) |
| 25 | else: |
| 26 | shape = None |
| 27 | table = pyarrow.parquet.read_table(path, memory_map=True) |
| 28 | |
| 29 | data_types = table.schema.types |
| 30 | # Spark ML feature processing produces single-column parquet files where each row is a vector object |
| 31 | if len(data_types) == 1 and isinstance(data_types[0], pyarrow.ListType): |
| 32 | arr = np.array(table.to_pandas().iloc[:, 0].to_list()) |
| 33 | logging.debug( |
| 34 | f"Parquet data under {path} converted from single vector per row to ndarray" |
| 35 | ) |
| 36 | else: |
| 37 | arr = table.to_pandas().to_numpy() |
| 38 | if not shape: |
| 39 | logging.debug( |
| 40 | "Shape information not found in the metadata, read the data as " |
| 41 | "a 2 dim array." |
| 42 | ) |
| 43 | logging.debug("Done reading from %s" % path) |
| 44 | shape = tuple(eval(shape.decode())) if shape else arr.shape |
| 45 | return arr.reshape(shape) |
| 46 | |
| 47 | def write(self, path, array, vector_rows=False): |
| 48 | logging.debug("Writing to %s using parquet format" % path) |