(shutdown_only, target_max_block_size_infinite_or_default)
| 71 | |
| 72 | |
| 73 | def test_callable_classes(shutdown_only, target_max_block_size_infinite_or_default): |
| 74 | ray.init(num_cpus=2) |
| 75 | ds = ray.data.range(10, override_num_blocks=10) |
| 76 | |
| 77 | class StatefulFn: |
| 78 | def __init__(self): |
| 79 | self.num_reuses = 0 |
| 80 | |
| 81 | def __call__(self, x): |
| 82 | r = self.num_reuses |
| 83 | self.num_reuses += 1 |
| 84 | return {"id": np.array([r])} |
| 85 | |
| 86 | # map |
| 87 | actor_reuse = ds.map(StatefulFn, concurrency=1).take() |
| 88 | assert sorted(extract_values("id", actor_reuse)) == [ |
| 89 | [v] for v in list(range(10)) |
| 90 | ], actor_reuse |
| 91 | |
| 92 | class StatefulFn: |
| 93 | def __init__(self): |
| 94 | self.num_reuses = 0 |
| 95 | |
| 96 | def __call__(self, x): |
| 97 | r = self.num_reuses |
| 98 | self.num_reuses += 1 |
| 99 | return [{"id": r}] |
| 100 | |
| 101 | # flat map |
| 102 | actor_reuse = extract_values("id", ds.flat_map(StatefulFn, concurrency=1).take()) |
| 103 | assert sorted(actor_reuse) == list(range(10)), actor_reuse |
| 104 | |
| 105 | class StatefulFn: |
| 106 | def __init__(self): |
| 107 | self.num_reuses = 0 |
| 108 | |
| 109 | def __call__(self, x): |
| 110 | r = self.num_reuses |
| 111 | self.num_reuses += 1 |
| 112 | return {"id": np.array([r])} |
| 113 | |
| 114 | # map batches |
| 115 | actor_reuse = extract_values( |
| 116 | "id", |
| 117 | ds.map_batches(StatefulFn, batch_size=1, concurrency=1).take(), |
| 118 | ) |
| 119 | assert sorted(actor_reuse) == list(range(10)), actor_reuse |
| 120 | |
| 121 | class StatefulFn: |
| 122 | def __init__(self): |
| 123 | self.num_reuses = 0 |
| 124 | |
| 125 | def __call__(self, x): |
| 126 | r = self.num_reuses |
| 127 | self.num_reuses += 1 |
| 128 | return r > 0 |
| 129 | |
| 130 | # filter |
nothing calls this directly
no test coverage detected
searching dependent graphs…