(broker, monkeypatch)
| 368 | |
| 369 | @pytest.mark.django_db |
| 370 | def test_recycle(broker, monkeypatch): |
| 371 | # set up the Sentinel |
| 372 | broker.list_key = "test_recycle_test:q" |
| 373 | async_task("django_q.tests.tasks.multiply", 2, 2, broker=broker) |
| 374 | async_task("django_q.tests.tasks.multiply", 2, 2, broker=broker) |
| 375 | async_task("django_q.tests.tasks.multiply", 2, 2, broker=broker) |
| 376 | start_event = Event() |
| 377 | stop_event = Event() |
| 378 | cluster_id = uuidlib.uuid4() |
| 379 | # override settings |
| 380 | monkeypatch.setattr(Conf, "RECYCLE", 2) |
| 381 | monkeypatch.setattr(Conf, "WORKERS", 1) |
| 382 | # set a timer to stop the Sentinel |
| 383 | threading.Timer(3, stop_event.set).start() |
| 384 | s = Sentinel(stop_event, start_event, cluster_id=cluster_id, broker=broker) |
| 385 | assert start_event.is_set() |
| 386 | assert s.status() == Conf.STOPPED |
| 387 | assert s.reincarnations == 1 |
| 388 | async_task("django_q.tests.tasks.multiply", 2, 2, broker=broker) |
| 389 | async_task("django_q.tests.tasks.multiply", 2, 2, broker=broker) |
| 390 | task_queue = Queue() |
| 391 | result_queue = Queue() |
| 392 | # push two tasks |
| 393 | pusher(task_queue, stop_event, broker=broker) |
| 394 | pusher(task_queue, stop_event, broker=broker) |
| 395 | # worker should exit on recycle |
| 396 | worker(task_queue, result_queue, Value("f", -1)) |
| 397 | # check if the work has been done |
| 398 | assert result_queue.qsize() == 2 |
| 399 | # save_limit test |
| 400 | monkeypatch.setattr(Conf, "SAVE_LIMIT", 1) |
| 401 | result_queue.put("STOP") |
| 402 | # run monitor |
| 403 | monitor(result_queue) |
| 404 | assert Success.objects.count() == Conf.SAVE_LIMIT |
| 405 | broker.delete_queue() |
| 406 | |
| 407 | |
| 408 | @pytest.mark.django_db |
nothing calls this directly
no test coverage detected