MCPcopy Index your code
hub / github.com/ray-project/ray / test_map_operator_streamed

Function test_map_operator_streamed

python/ray/data/tests/test_map_operator.py:100–159  ·  view source on GitHub ↗
(ray_start_regular_shared, use_actors)

Source from the content-addressed store, hash-verified

98
99@pytest.mark.parametrize("use_actors", [False, True])
100def test_map_operator_streamed(ray_start_regular_shared, use_actors):
101 # Create with inputs.
102 input_op = InputDataBuffer(
103 DataContext.get_current(),
104 make_ref_bundles([[np.ones(1024) * i] for i in range(100)]),
105 )
106 compute_strategy = ActorPoolStrategy() if use_actors else TaskPoolStrategy()
107 op = MapOperator.create(
108 _mul2_map_data_prcessor,
109 input_op,
110 DataContext.get_current(),
111 name="TestMapper",
112 compute_strategy=compute_strategy,
113 )
114
115 # Feed data and implement streaming exec.
116 output = []
117 # Use preserve_order so output order matches input order (required for
118 # actor pool, which otherwise returns results in completion order).
119 op.start(ExecutionOptions(actor_locality_enabled=True, preserve_order=True))
120
121 if use_actors:
122 # Wait for actors to be ready before adding inputs.
123 run_op_tasks_sync(op, only_existing=True)
124
125 while input_op.has_next():
126 # If actor pool at capacity run 1 task and allow it to copmlete
127 while not op.can_add_input():
128 run_one_op_task(op)
129
130 op.add_input(input_op.get_next(), 0)
131
132 # Complete ingesting inputs
133 op.all_inputs_done()
134 run_op_tasks_sync(op)
135
136 assert op.has_execution_finished()
137 # NOTE: Op is not considered completed until its outputs are drained
138 assert not op.has_completed()
139
140 # Fetch all outputs
141 while op.has_next():
142 ref = op.get_next()
143 assert ref.owns_blocks, ref
144 _get_blocks(ref, output)
145
146 assert op.has_completed()
147
148 expected = [[np.ones(1024) * i * 2] for i in range(100)]
149 output_sorted = sorted(output, key=lambda x: np.asarray(x[0]).flat[0])
150 expected_sorted = sorted(expected, key=lambda x: np.asarray(x[0]).flat[0])
151 assert np.array_equal(output_sorted, expected_sorted)
152 metrics = op.metrics.as_dict()
153 assert metrics["obj_store_mem_freed"] == pytest.approx(832200, 0.5), metrics
154 if use_actors:
155 assert "locality_hits" in metrics, metrics
156 assert "locality_misses" in metrics, metrics
157 else:

Callers

nothing calls this directly

Calls 15

has_nextMethod · 0.95
InputDataBufferClass · 0.90
make_ref_bundlesFunction · 0.90
ActorPoolStrategyClass · 0.90
TaskPoolStrategyClass · 0.90
ExecutionOptionsClass · 0.90
run_op_tasks_syncFunction · 0.90
run_one_op_taskFunction · 0.90
_get_blocksFunction · 0.90
rangeFunction · 0.70
get_currentMethod · 0.45

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…