(broker, monkeypatch)
| 472 | |
| 473 | @pytest.mark.django_db |
| 474 | def test_attempt_count(broker, monkeypatch): |
| 475 | monkeypatch.setattr(Conf, "MAX_ATTEMPTS", 3) |
| 476 | tag = uuid() |
| 477 | task = { |
| 478 | "id": tag[1], |
| 479 | "name": tag[0], |
| 480 | "func": "math.copysign", |
| 481 | "args": (1, -1), |
| 482 | "kwargs": {}, |
| 483 | "started": timezone.now(), |
| 484 | "stopped": timezone.now(), |
| 485 | "success": False, |
| 486 | "result": None, |
| 487 | } |
| 488 | # initial save - no success |
| 489 | save_task(task, broker) |
| 490 | assert Task.objects.filter(id=task["id"]).exists() |
| 491 | saved_task = Task.objects.get(id=task["id"]) |
| 492 | assert saved_task.attempt_count == 1 |
| 493 | sleep(0.5) |
| 494 | # second save |
| 495 | old_stopped = task["stopped"] |
| 496 | task["stopped"] = timezone.now() |
| 497 | save_task(task, broker) |
| 498 | saved_task = Task.objects.get(id=task["id"]) |
| 499 | assert saved_task.attempt_count == 2 |
| 500 | # third save - |
| 501 | task["stopped"] = timezone.now() |
| 502 | save_task(task, broker) |
| 503 | saved_task = Task.objects.get(id=task["id"]) |
| 504 | assert saved_task.attempt_count == 3 |
| 505 | # task should be removed from queue |
| 506 | assert broker.queue_size() == 0 |
| 507 | |
| 508 | |
| 509 | @pytest.mark.django_db |
nothing calls this directly
no test coverage detected