Returns ``n`` :class:`DataIterators ` that can be used to read disjoint subsets of the dataset in parallel. This method is the recommended way to consume :class:`Datasets ` for distributed training. Streaming split works by delegating
(
self,
n: int,
*,
equal: bool = False,
locality_hints: Optional[List["NodeIdStr"]] = None,
)
| 2114 | @ConsumptionAPI |
| 2115 | @PublicAPI(api_group=SMJ_API_GROUP) |
| 2116 | def streaming_split( |
| 2117 | self, |
| 2118 | n: int, |
| 2119 | *, |
| 2120 | equal: bool = False, |
| 2121 | locality_hints: Optional[List["NodeIdStr"]] = None, |
| 2122 | ) -> List[DataIterator]: |
| 2123 | """Returns ``n`` :class:`DataIterators <ray.data.DataIterator>` that can |
| 2124 | be used to read disjoint subsets of the dataset in parallel. |
| 2125 | |
| 2126 | This method is the recommended way to consume :class:`Datasets <Dataset>` for |
| 2127 | distributed training. |
| 2128 | |
| 2129 | Streaming split works by delegating the execution of this :class:`Dataset` to a |
| 2130 | coordinator actor. The coordinator pulls block references from the executed |
| 2131 | stream, and divides those blocks among ``n`` output iterators. Iterators pull |
| 2132 | blocks from the coordinator actor to return to their caller on ``next``. |
| 2133 | |
| 2134 | The returned iterators are also repeatable; each iteration will trigger a |
| 2135 | new execution of the Dataset. There is an implicit barrier at the start of |
| 2136 | each iteration, which means that `next` must be called on all iterators before |
| 2137 | the iteration starts. |
| 2138 | |
| 2139 | .. warning:: |
| 2140 | |
| 2141 | Because iterators are pulling blocks from the same :class:`Dataset` |
| 2142 | execution, if one iterator falls behind, other iterators may be stalled. |
| 2143 | |
| 2144 | Examples: |
| 2145 | |
| 2146 | .. testcode:: |
| 2147 | |
| 2148 | import ray |
| 2149 | |
| 2150 | ds = ray.data.range(100) |
| 2151 | it1, it2 = ds.streaming_split(2, equal=True) |
| 2152 | |
| 2153 | Consume data from iterators in parallel. |
| 2154 | |
| 2155 | .. testcode:: |
| 2156 | |
| 2157 | @ray.remote |
| 2158 | def consume(it): |
| 2159 | for batch in it.iter_batches(): |
| 2160 | pass |
| 2161 | |
| 2162 | ray.get([consume.remote(it1), consume.remote(it2)]) |
| 2163 | |
| 2164 | You can loop over the iterators multiple times (multiple epochs). |
| 2165 | |
| 2166 | .. testcode:: |
| 2167 | |
| 2168 | @ray.remote |
| 2169 | def train(it): |
| 2170 | NUM_EPOCHS = 2 |
| 2171 | for _ in range(NUM_EPOCHS): |
| 2172 | for batch in it.iter_batches(): |
| 2173 | pass |