(self)
| 247 | self.assertTrue(file_import_task.public) |
| 248 | |
| 249 | def test_backup(self): |
| 250 | client = APIClient() |
| 251 | |
| 252 | with start_processing_node(): |
| 253 | user = User.objects.get(username="testuser") |
| 254 | self.assertFalse(user.is_superuser) |
| 255 | project = Project.objects.create( |
| 256 | owner=user, |
| 257 | name="test backup" |
| 258 | ) |
| 259 | |
| 260 | image1 = open("app/fixtures/tiny_drone_image.jpg", 'rb') |
| 261 | image2 = open("app/fixtures/tiny_drone_image_2.jpg", 'rb') |
| 262 | |
| 263 | # Create processing node |
| 264 | pnode = ProcessingNode.objects.create(hostname="localhost", port=11223) |
| 265 | assign_perm('view_processingnode', user, pnode) |
| 266 | |
| 267 | client.login(username="testuser", password="test1234") |
| 268 | |
| 269 | # Create task |
| 270 | res = client.post("/api/projects/{}/tasks/".format(project.id), { |
| 271 | 'images': [image1, image2] |
| 272 | }, format="multipart") |
| 273 | image1.close() |
| 274 | image2.close() |
| 275 | task = Task.objects.get(id=res.data['id']) |
| 276 | |
| 277 | # Wait for completion |
| 278 | c = 0 |
| 279 | while c < 10: |
| 280 | worker.tasks.process_pending_tasks() |
| 281 | task.refresh_from_db() |
| 282 | if task.status == status_codes.COMPLETED: |
| 283 | break |
| 284 | c += 1 |
| 285 | time.sleep(1) |
| 286 | |
| 287 | # Assign some fields |
| 288 | task.name = "Backup test" |
| 289 | task.potree_scene = {'saved': True} |
| 290 | task.public = True |
| 291 | task.save() |
| 292 | |
| 293 | self.assertEqual(task.status, status_codes.COMPLETED) |
| 294 | |
| 295 | # Download task backup |
| 296 | task_uuid = task.uuid |
| 297 | res = client.get("/api/projects/{}/tasks/{}/backup".format(project.id, task.id)) |
| 298 | self.assertEqual(res.status_code, status.HTTP_200_OK) |
| 299 | |
| 300 | if not os.path.exists(settings.MEDIA_TMP): |
| 301 | os.mkdir(settings.MEDIA_TMP) |
| 302 | |
| 303 | assets_path = os.path.join(settings.MEDIA_TMP, "backup.zip") |
| 304 | |
| 305 | with open(assets_path, 'wb') as f: |
| 306 | f.write(b''.join(res.streaming_content)) |
nothing calls this directly
no test coverage detected