MCPcopy
hub / github.com/ray-project/ray / streaming_split

Method streaming_split

python/ray/data/dataset.py:2116–2216  ·  view source on GitHub ↗

Returns ``n`` :class:`DataIterators ` that can be used to read disjoint subsets of the dataset in parallel. This method is the recommended way to consume :class:`Datasets ` for distributed training. Streaming split works by delegating

(
        self,
        n: int,
        *,
        equal: bool = False,
        locality_hints: Optional[List["NodeIdStr"]] = None,
    )

Source from the content-addressed store, hash-verified

2114 @ConsumptionAPI
2115 @PublicAPI(api_group=SMJ_API_GROUP)
2116 def streaming_split(
2117 self,
2118 n: int,
2119 *,
2120 equal: bool = False,
2121 locality_hints: Optional[List["NodeIdStr"]] = None,
2122 ) -> List[DataIterator]:
2123 """Returns ``n`` :class:`DataIterators <ray.data.DataIterator>` that can
2124 be used to read disjoint subsets of the dataset in parallel.
2125
2126 This method is the recommended way to consume :class:`Datasets <Dataset>` for
2127 distributed training.
2128
2129 Streaming split works by delegating the execution of this :class:`Dataset` to a
2130 coordinator actor. The coordinator pulls block references from the executed
2131 stream, and divides those blocks among ``n`` output iterators. Iterators pull
2132 blocks from the coordinator actor to return to their caller on ``next``.
2133
2134 The returned iterators are also repeatable; each iteration will trigger a
2135 new execution of the Dataset. There is an implicit barrier at the start of
2136 each iteration, which means that `next` must be called on all iterators before
2137 the iteration starts.
2138
2139 .. warning::
2140
2141 Because iterators are pulling blocks from the same :class:`Dataset`
2142 execution, if one iterator falls behind, other iterators may be stalled.
2143
2144 Examples:
2145
2146 .. testcode::
2147
2148 import ray
2149
2150 ds = ray.data.range(100)
2151 it1, it2 = ds.streaming_split(2, equal=True)
2152
2153 Consume data from iterators in parallel.
2154
2155 .. testcode::
2156
2157 @ray.remote
2158 def consume(it):
2159 for batch in it.iter_batches():
2160 pass
2161
2162 ray.get([consume.remote(it1), consume.remote(it2)])
2163
2164 You can loop over the iterators multiple times (multiple epochs).
2165
2166 .. testcode::
2167
2168 @ray.remote
2169 def train(it):
2170 NUM_EPOCHS = 2
2171 for _ in range(NUM_EPOCHS):
2172 for batch in it.iter_batches():
2173 pass

Calls 5

StreamingSplitClass · 0.90
LogicalPlanClass · 0.90
_from_parentMethod · 0.80
_set_uuidMethod · 0.80
createMethod · 0.45