This method tests that concurrent requests don't corrupt each other's cache state. This is a regression test for the race condition in the original implementation.
(self)
| 135 | self.tearDown() |
| 136 | |
| 137 | def test_concurrent_requests(self): |
| 138 | """ |
| 139 | This method tests that concurrent requests don't corrupt each other's cache state. |
| 140 | This is a regression test for the race condition in the original implementation. |
| 141 | """ |
| 142 | import concurrent.futures |
| 143 | |
| 144 | try: |
| 145 | self.setUp() |
| 146 | with grpc.insecure_channel("localhost:50051") as channel: |
| 147 | stub = backend_pb2_grpc.BackendStub(channel) |
| 148 | response = stub.LoadModel(backend_pb2.ModelOptions(Model="mlx-community/Llama-3.2-1B-Instruct-4bit")) |
| 149 | self.assertTrue(response.success) |
| 150 | |
| 151 | def make_request(prompt): |
| 152 | req = backend_pb2.PredictOptions(Prompt=prompt, Tokens=20) |
| 153 | return stub.Predict(req) |
| 154 | |
| 155 | # Run 5 concurrent requests with different prompts |
| 156 | prompts = [ |
| 157 | "The capital of France is", |
| 158 | "The capital of Germany is", |
| 159 | "The capital of Italy is", |
| 160 | "The capital of Spain is", |
| 161 | "The capital of Portugal is", |
| 162 | ] |
| 163 | |
| 164 | with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: |
| 165 | futures = [executor.submit(make_request, p) for p in prompts] |
| 166 | results = [f.result() for f in concurrent.futures.as_completed(futures)] |
| 167 | |
| 168 | # All results should be non-empty |
| 169 | messages = [r.message for r in results] |
| 170 | self.assertTrue(all(len(m) > 0 for m in messages), "All requests should return non-empty responses") |
| 171 | print(f"Concurrent test passed: {len(messages)} responses received") |
| 172 | |
| 173 | except Exception as err: |
| 174 | print(err) |
| 175 | self.fail("Concurrent requests test failed") |
| 176 | finally: |
| 177 | self.tearDown() |
| 178 | |
| 179 | def test_cache_reuse(self): |
| 180 | """ |