(broker, cluster_config_timeout, async_task_kwargs)
| 311 | ), |
| 312 | ) |
| 313 | def test_timeout(broker, cluster_config_timeout, async_task_kwargs): |
| 314 | # set up the Sentinel |
| 315 | broker.list_key = "timeout_test:q" |
| 316 | broker.purge_queue() |
| 317 | async_task("time.sleep", 5, broker=broker, **async_task_kwargs) |
| 318 | start_event = Event() |
| 319 | stop_event = Event() |
| 320 | cluster_id = uuidlib.uuid4() |
| 321 | # Set a timer to stop the Sentinel |
| 322 | threading.Timer(3, stop_event.set).start() |
| 323 | s = Sentinel( |
| 324 | stop_event, |
| 325 | start_event, |
| 326 | cluster_id=cluster_id, |
| 327 | broker=broker, |
| 328 | timeout=cluster_config_timeout, |
| 329 | ) |
| 330 | assert start_event.is_set() |
| 331 | assert s.status() == Conf.STOPPED |
| 332 | assert s.reincarnations == 1 |
| 333 | broker.delete_queue() |
| 334 | |
| 335 | |
| 336 | @pytest.mark.django_db |
nothing calls this directly
no test coverage detected