(ray_start_regular_shared, use_actors)
| 98 | |
| 99 | @pytest.mark.parametrize("use_actors", [False, True]) |
| 100 | def test_map_operator_streamed(ray_start_regular_shared, use_actors): |
| 101 | # Create with inputs. |
| 102 | input_op = InputDataBuffer( |
| 103 | DataContext.get_current(), |
| 104 | make_ref_bundles([[np.ones(1024) * i] for i in range(100)]), |
| 105 | ) |
| 106 | compute_strategy = ActorPoolStrategy() if use_actors else TaskPoolStrategy() |
| 107 | op = MapOperator.create( |
| 108 | _mul2_map_data_prcessor, |
| 109 | input_op, |
| 110 | DataContext.get_current(), |
| 111 | name="TestMapper", |
| 112 | compute_strategy=compute_strategy, |
| 113 | ) |
| 114 | |
| 115 | # Feed data and implement streaming exec. |
| 116 | output = [] |
| 117 | # Use preserve_order so output order matches input order (required for |
| 118 | # actor pool, which otherwise returns results in completion order). |
| 119 | op.start(ExecutionOptions(actor_locality_enabled=True, preserve_order=True)) |
| 120 | |
| 121 | if use_actors: |
| 122 | # Wait for actors to be ready before adding inputs. |
| 123 | run_op_tasks_sync(op, only_existing=True) |
| 124 | |
| 125 | while input_op.has_next(): |
| 126 | # If actor pool at capacity run 1 task and allow it to copmlete |
| 127 | while not op.can_add_input(): |
| 128 | run_one_op_task(op) |
| 129 | |
| 130 | op.add_input(input_op.get_next(), 0) |
| 131 | |
| 132 | # Complete ingesting inputs |
| 133 | op.all_inputs_done() |
| 134 | run_op_tasks_sync(op) |
| 135 | |
| 136 | assert op.has_execution_finished() |
| 137 | # NOTE: Op is not considered completed until its outputs are drained |
| 138 | assert not op.has_completed() |
| 139 | |
| 140 | # Fetch all outputs |
| 141 | while op.has_next(): |
| 142 | ref = op.get_next() |
| 143 | assert ref.owns_blocks, ref |
| 144 | _get_blocks(ref, output) |
| 145 | |
| 146 | assert op.has_completed() |
| 147 | |
| 148 | expected = [[np.ones(1024) * i * 2] for i in range(100)] |
| 149 | output_sorted = sorted(output, key=lambda x: np.asarray(x[0]).flat[0]) |
| 150 | expected_sorted = sorted(expected, key=lambda x: np.asarray(x[0]).flat[0]) |
| 151 | assert np.array_equal(output_sorted, expected_sorted) |
| 152 | metrics = op.metrics.as_dict() |
| 153 | assert metrics["obj_store_mem_freed"] == pytest.approx(832200, 0.5), metrics |
| 154 | if use_actors: |
| 155 | assert "locality_hits" in metrics, metrics |
| 156 | assert "locality_misses" in metrics, metrics |
| 157 | else: |
nothing calls this directly
no test coverage detected
searching dependent graphs…