JSON datasource, for reading and writing JSON and JSONL files.
| 33 | |
| 34 | |
| 35 | class ArrowJSONDatasource(FileBasedDatasource): |
| 36 | """JSON datasource, for reading and writing JSON and JSONL files.""" |
| 37 | |
| 38 | def __init__( |
| 39 | self, |
| 40 | paths: Union[str, List[str]], |
| 41 | *, |
| 42 | arrow_json_args: Optional[Dict[str, Any]] = None, |
| 43 | **file_based_datasource_kwargs, |
| 44 | ): |
| 45 | from pyarrow import json |
| 46 | |
| 47 | super().__init__(paths, **file_based_datasource_kwargs) |
| 48 | |
| 49 | if arrow_json_args is None: |
| 50 | arrow_json_args = {} |
| 51 | |
| 52 | self.read_options = arrow_json_args.pop( |
| 53 | "read_options", json.ReadOptions(use_threads=False) |
| 54 | ) |
| 55 | self.arrow_json_args = arrow_json_args |
| 56 | |
| 57 | def _read_with_pyarrow_read_json(self, buffer: "pyarrow.lib.Buffer"): |
| 58 | """Read with PyArrow JSON reader, trying to auto-increase the |
| 59 | read block size in the case of the read object |
| 60 | straddling block boundaries.""" |
| 61 | import pyarrow as pa |
| 62 | import pyarrow.json as pajson |
| 63 | |
| 64 | # When reading large files, the default block size configured in PyArrow can be |
| 65 | # too small, resulting in the following error: `pyarrow.lib.ArrowInvalid: |
| 66 | # straddling object straddles two block boundaries (try to increase block |
| 67 | # size?)`. More information on this issue can be found here: |
| 68 | # https://github.com/apache/arrow/issues/25674 |
| 69 | # The read will be retried with geometrically increasing block size |
| 70 | # until the size reaches `DataContext.get_current().target_max_block_size`. |
| 71 | # The initial block size will start at the PyArrow default block size |
| 72 | # or it can be manually set through the `read_options` parameter as follows. |
| 73 | # >>> import pyarrow.json as pajson |
| 74 | # >>> block_size = 10 << 20 # Set block size to 10MB |
| 75 | # >>> ray.data.read_json( # doctest: +SKIP |
| 76 | # ... "s3://anonymous@ray-example-data/log.json", |
| 77 | # ... read_options=pajson.ReadOptions(block_size=block_size) |
| 78 | # ... ) |
| 79 | |
| 80 | init_block_size = self.read_options.block_size |
| 81 | max_block_size = DataContext.get_current().target_max_block_size |
| 82 | while True: |
| 83 | try: |
| 84 | yield pajson.read_json( |
| 85 | io.BytesIO(buffer), |
| 86 | read_options=self.read_options, |
| 87 | **self.arrow_json_args, |
| 88 | ) |
| 89 | self.read_options.block_size = init_block_size |
| 90 | break |
| 91 | except pa.ArrowInvalid as e: |
| 92 | if "straddling object straddles two block boundaries" in str(e): |
no outgoing calls
no test coverage detected
searching dependent graphs…