| 11 | |
| 12 | @pytest.mark.django_db |
| 13 | def test_admin_views(admin_client, monkeypatch): |
| 14 | monkeypatch.setattr(Conf, "ORM", "default") |
| 15 | s = schedule("schedule.test") |
| 16 | tag = uuid() |
| 17 | f = Task.objects.create( |
| 18 | id=tag[1], |
| 19 | name=tag[0], |
| 20 | func="test.fail", |
| 21 | started=timezone.now(), |
| 22 | stopped=timezone.now(), |
| 23 | success=False, |
| 24 | ) |
| 25 | tag = uuid() |
| 26 | t = Task.objects.create( |
| 27 | id=tag[1], |
| 28 | name=tag[0], |
| 29 | func="test.success", |
| 30 | started=timezone.now(), |
| 31 | stopped=timezone.now(), |
| 32 | success=True, |
| 33 | ) |
| 34 | q = OrmQ.objects.create( |
| 35 | key="test", |
| 36 | payload=SignedPackage.dumps({"id": 1, "func": "test", "name": "test"}), |
| 37 | ) |
| 38 | admin_urls = ( |
| 39 | # schedule |
| 40 | reverse("admin:django_q_schedule_changelist"), |
| 41 | reverse("admin:django_q_schedule_add"), |
| 42 | reverse("admin:django_q_schedule_change", args=(s.id,)), |
| 43 | reverse("admin:django_q_schedule_history", args=(s.id,)), |
| 44 | reverse("admin:django_q_schedule_delete", args=(s.id,)), |
| 45 | # success |
| 46 | reverse("admin:django_q_success_changelist"), |
| 47 | reverse("admin:django_q_success_change", args=(t.id,)), |
| 48 | reverse("admin:django_q_success_history", args=(t.id,)), |
| 49 | reverse("admin:django_q_success_delete", args=(t.id,)), |
| 50 | # failure |
| 51 | reverse("admin:django_q_failure_changelist"), |
| 52 | reverse("admin:django_q_failure_change", args=(f.id,)), |
| 53 | reverse("admin:django_q_failure_history", args=(f.id,)), |
| 54 | reverse("admin:django_q_failure_delete", args=(f.id,)), |
| 55 | # orm queue |
| 56 | reverse("admin:django_q_ormq_changelist"), |
| 57 | reverse("admin:django_q_ormq_change", args=(q.id,)), |
| 58 | reverse("admin:django_q_ormq_history", args=(q.id,)), |
| 59 | reverse("admin:django_q_ormq_delete", args=(q.id,)), |
| 60 | ) |
| 61 | for url in admin_urls: |
| 62 | response = admin_client.get(url) |
| 63 | assert response.status_code == 200 |
| 64 | |
| 65 | # resubmit the failure |
| 66 | url = reverse("admin:django_q_failure_changelist") |
| 67 | data = {"action": "retry_failed", "_selected_action": [f.pk]} |
| 68 | response = admin_client.post(url, data) |
| 69 | assert response.status_code == 302 |
| 70 | assert Failure.objects.filter(name=f.id).exists() is False |