(broker, cluster_config_timeout, async_task_kwargs)
| 344 | ), |
| 345 | ) |
| 346 | def test_timeout_task_finishes(broker, cluster_config_timeout, async_task_kwargs): |
| 347 | # set up the Sentinel |
| 348 | broker.list_key = "timeout_test:q" |
| 349 | broker.purge_queue() |
| 350 | async_task("time.sleep", 3, broker=broker, **async_task_kwargs) |
| 351 | start_event = Event() |
| 352 | stop_event = Event() |
| 353 | cluster_id = uuidlib.uuid4() |
| 354 | # Set a timer to stop the Sentinel |
| 355 | threading.Timer(6, stop_event.set).start() |
| 356 | s = Sentinel( |
| 357 | stop_event, |
| 358 | start_event, |
| 359 | cluster_id=cluster_id, |
| 360 | broker=broker, |
| 361 | timeout=cluster_config_timeout, |
| 362 | ) |
| 363 | assert start_event.is_set() |
| 364 | assert s.status() == Conf.STOPPED |
| 365 | assert s.reincarnations == 0 |
| 366 | broker.delete_queue() |
| 367 | |
| 368 | |
| 369 | @pytest.mark.django_db |
nothing calls this directly
no test coverage detected