MCPcopy
hub / github.com/ray-project/ray / ArrowJSONDatasource

Class ArrowJSONDatasource

python/ray/data/_internal/datasource/json_datasource.py:35–161  ·  view source on GitHub ↗

JSON datasource, for reading and writing JSON and JSONL files.

Source from the content-addressed store, hash-verified

33
34
35class ArrowJSONDatasource(FileBasedDatasource):
36 """JSON datasource, for reading and writing JSON and JSONL files."""
37
38 def __init__(
39 self,
40 paths: Union[str, List[str]],
41 *,
42 arrow_json_args: Optional[Dict[str, Any]] = None,
43 **file_based_datasource_kwargs,
44 ):
45 from pyarrow import json
46
47 super().__init__(paths, **file_based_datasource_kwargs)
48
49 if arrow_json_args is None:
50 arrow_json_args = {}
51
52 self.read_options = arrow_json_args.pop(
53 "read_options", json.ReadOptions(use_threads=False)
54 )
55 self.arrow_json_args = arrow_json_args
56
57 def _read_with_pyarrow_read_json(self, buffer: "pyarrow.lib.Buffer"):
58 """Read with PyArrow JSON reader, trying to auto-increase the
59 read block size in the case of the read object
60 straddling block boundaries."""
61 import pyarrow as pa
62 import pyarrow.json as pajson
63
64 # When reading large files, the default block size configured in PyArrow can be
65 # too small, resulting in the following error: `pyarrow.lib.ArrowInvalid:
66 # straddling object straddles two block boundaries (try to increase block
67 # size?)`. More information on this issue can be found here:
68 # https://github.com/apache/arrow/issues/25674
69 # The read will be retried with geometrically increasing block size
70 # until the size reaches `DataContext.get_current().target_max_block_size`.
71 # The initial block size will start at the PyArrow default block size
72 # or it can be manually set through the `read_options` parameter as follows.
73 # >>> import pyarrow.json as pajson
74 # >>> block_size = 10 << 20 # Set block size to 10MB
75 # >>> ray.data.read_json( # doctest: +SKIP
76 # ... "s3://anonymous@ray-example-data/log.json",
77 # ... read_options=pajson.ReadOptions(block_size=block_size)
78 # ... )
79
80 init_block_size = self.read_options.block_size
81 max_block_size = DataContext.get_current().target_max_block_size
82 while True:
83 try:
84 yield pajson.read_json(
85 io.BytesIO(buffer),
86 read_options=self.read_options,
87 **self.arrow_json_args,
88 )
89 self.read_options.block_size = init_block_size
90 break
91 except pa.ArrowInvalid as e:
92 if "straddling object straddles two block boundaries" in str(e):

Callers 1

read_jsonFunction · 0.90

Calls

no outgoing calls

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…