(self, request, pk=None)
| 16 | |
| 17 | class TaskVolume(TaskView): |
| 18 | def post(self, request, pk=None): |
| 19 | task = self.get_and_check_task(request, pk) |
| 20 | if task.dsm_extent is None: |
| 21 | return Response({'error': _('No surface model available. From the Dashboard, select this task, press Edit, from the options make sure to check "dsm", then press Restart --> From DEM.')}) |
| 22 | |
| 23 | serializer = VolumeRequestSerializer(data=request.data) |
| 24 | serializer.is_valid(raise_exception=True) |
| 25 | |
| 26 | area = serializer['area'].value |
| 27 | method = serializer['method'].value |
| 28 | points = [coord for coord in area['geometry']['coordinates'][0]] |
| 29 | dsm = os.path.abspath(task.get_asset_download_path("dsm.tif")) |
| 30 | |
| 31 | try: |
| 32 | celery_task_id = run_function_async(calc_volume, input_dem=dsm, pts=points, pts_epsg=4326, base_method=method).task_id |
| 33 | return Response({'celery_task_id': celery_task_id}, status=status.HTTP_200_OK) |
| 34 | except Exception as e: |
| 35 | return Response({'error': str(e)}, status=status.HTTP_200_OK) |
| 36 | |
| 37 | class TaskVolumeResult(GetTaskResult): |
| 38 | def get(self, request, pk=None, celery_task_id=None): |
nothing calls this directly
no test coverage detected