(self)
| 27 | clear_test_media_root() |
| 28 | |
| 29 | def test_task(self): |
| 30 | client = APIClient() |
| 31 | |
| 32 | with start_processing_node(): |
| 33 | user = User.objects.get(username="testuser") |
| 34 | self.assertFalse(user.is_superuser) |
| 35 | project = Project.objects.create( |
| 36 | owner=user, |
| 37 | name="test project" |
| 38 | ) |
| 39 | |
| 40 | image1 = open("app/fixtures/tiny_drone_image.jpg", 'rb') |
| 41 | image2 = open("app/fixtures/tiny_drone_image_2.jpg", 'rb') |
| 42 | |
| 43 | # Create processing node |
| 44 | pnode = ProcessingNode.objects.create(hostname="localhost", port=11223) |
| 45 | assign_perm('view_processingnode', user, pnode) |
| 46 | client.login(username="testuser", password="test1234") |
| 47 | |
| 48 | # Create task |
| 49 | res = client.post("/api/projects/{}/tasks/".format(project.id), { |
| 50 | 'images': [image1, image2] |
| 51 | }, format="multipart") |
| 52 | image1.close() |
| 53 | image2.close() |
| 54 | task = Task.objects.get(id=res.data['id']) |
| 55 | |
| 56 | # Wait for completion |
| 57 | c = 0 |
| 58 | while c < 10: |
| 59 | worker.tasks.process_pending_tasks() |
| 60 | task.refresh_from_db() |
| 61 | if task.status == status_codes.COMPLETED: |
| 62 | break |
| 63 | c += 1 |
| 64 | time.sleep(1) |
| 65 | |
| 66 | |
| 67 | self.assertEqual(task.status, status_codes.COMPLETED) |
| 68 | |
| 69 | if not os.path.exists(settings.MEDIA_TMP): |
| 70 | os.mkdir(settings.MEDIA_TMP) |
| 71 | |
| 72 | # EPT assets should be there |
| 73 | res = client.get("/api/projects/{}/tasks/{}/assets/entwine_pointcloud/ept.json".format(project.id, task.id)) |
| 74 | self.assertEqual(res.status_code, status.HTTP_200_OK) |
| 75 | |
| 76 | # Try removing it |
| 77 | shutil.rmtree(task.assets_path("entwine_pointcloud")) |
| 78 | |
| 79 | res = client.get("/api/projects/{}/tasks/{}/assets/entwine_pointcloud/ept.json".format(project.id, task.id)) |
| 80 | self.assertEqual(res.status_code, status.HTTP_404_NOT_FOUND) |
| 81 | |
| 82 | # Rebuild it |
| 83 | self.assertTrue(task.check_ept()) |
| 84 | |
| 85 | # EPT available again |
| 86 | res = client.get("/api/projects/{}/tasks/{}/assets/entwine_pointcloud/ept.json".format(project.id, task.id)) |
nothing calls this directly
no test coverage detected