MCPcopy
hub / github.com/ray-project/ray / _run_map_operator_test

Function _run_map_operator_test

python/ray/data/tests/test_map_operator.py:46–96  ·  view source on GitHub ↗

Shared test function for MapOperator output unbundling tests.

(
    ray_start_regular_shared,
    use_actors,
    preserve_order,
    transform_fn,
    output_block_size_option,
    expected_blocks,
    test_name="TestMapper",
)

Source from the content-addressed store, hash-verified

44
45
46def _run_map_operator_test(
47 ray_start_regular_shared,
48 use_actors,
49 preserve_order,
50 transform_fn,
51 output_block_size_option,
52 expected_blocks,
53 test_name="TestMapper",
54):
55 """Shared test function for MapOperator output unbundling tests."""
56 # Create with inputs.
57 input_op = InputDataBuffer(
58 DataContext.get_current(), make_ref_bundles([[i] for i in range(10)])
59 )
60 compute_strategy = ActorPoolStrategy() if use_actors else TaskPoolStrategy()
61
62 transformer = create_map_transformer_from_block_fn(
63 transform_fn,
64 output_block_size_option=output_block_size_option,
65 )
66
67 op = MapOperator.create(
68 transformer,
69 input_op=input_op,
70 data_context=DataContext.get_current(),
71 name=test_name,
72 compute_strategy=compute_strategy,
73 # Send everything in a single bundle of 10 blocks.
74 min_rows_per_bundle=10,
75 )
76
77 # Feed data and block on exec.
78 op.start(ExecutionOptions(preserve_order=preserve_order))
79 if use_actors:
80 # Wait for actors to be ready before adding inputs.
81 run_op_tasks_sync(op, only_existing=True)
82
83 while input_op.has_next():
84 assert op.can_add_input()
85 op.add_input(input_op.get_next(), 0)
86
87 op.all_inputs_done()
88
89 run_op_tasks_sync(op)
90
91 # Check that bundles are unbundled in the output queue.
92 outputs = []
93 while op.has_next():
94 outputs.append(op.get_next())
95 assert len(outputs) == expected_blocks
96 assert op.has_completed()
97
98
99@pytest.mark.parametrize("use_actors", [False, True])

Calls 15

has_nextMethod · 0.95
InputDataBufferClass · 0.90
make_ref_bundlesFunction · 0.90
ActorPoolStrategyClass · 0.90
TaskPoolStrategyClass · 0.90
ExecutionOptionsClass · 0.90
run_op_tasks_syncFunction · 0.90
rangeFunction · 0.70
get_currentMethod · 0.45
createMethod · 0.45
startMethod · 0.45

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…