This method contains the logic for processing tasks asynchronously from a background thread or from a worker. Here tasks that are ready to be processed execute some logic. This could be communication with a processing node or executing a pending action.
(self)
| 681 | self.save() |
| 682 | |
| 683 | def process(self): |
| 684 | """ |
| 685 | This method contains the logic for processing tasks asynchronously |
| 686 | from a background thread or from a worker. Here tasks that are |
| 687 | ready to be processed execute some logic. This could be communication |
| 688 | with a processing node or executing a pending action. |
| 689 | """ |
| 690 | |
| 691 | try: |
| 692 | if self.pending_action == pending_actions.IMPORT: |
| 693 | self.handle_import() |
| 694 | |
| 695 | if self.pending_action == pending_actions.RESIZE: |
| 696 | resized_images = self.resize_images() |
| 697 | self.refresh_from_db() |
| 698 | self.resize_gcp(resized_images) |
| 699 | self.pending_action = None |
| 700 | self.save() |
| 701 | |
| 702 | if self.auto_processing_node and not self.status in [status_codes.FAILED, status_codes.CANCELED]: |
| 703 | # No processing node assigned and need to auto assign |
| 704 | if self.processing_node is None: |
| 705 | # Assign first online node with lowest queue count |
| 706 | self.processing_node = ProcessingNode.find_best_available_node(self.project.owner) |
| 707 | if self.processing_node: |
| 708 | self.processing_node.queue_count += 1 # Doesn't have to be accurate, it will get overridden later |
| 709 | self.processing_node.save() |
| 710 | |
| 711 | logger.info("Automatically assigned processing node {} to {}".format(self.processing_node, self)) |
| 712 | self.save() |
| 713 | |
| 714 | # Processing node assigned, but is offline and no errors |
| 715 | if self.processing_node and not self.processing_node.is_online(): |
| 716 | # If we are queued up |
| 717 | # detach processing node, and reassignment |
| 718 | # will be processed at the next tick |
| 719 | if self.status == status_codes.QUEUED: |
| 720 | logger.info("Processing node {} went offline, reassigning {}...".format(self.processing_node, self)) |
| 721 | self.uuid = '' |
| 722 | self.processing_node = None |
| 723 | self.status = None |
| 724 | self.save() |
| 725 | |
| 726 | elif self.status == status_codes.RUNNING: |
| 727 | # Task was running and processing node went offline |
| 728 | # It could have crashed due to low memory |
| 729 | # or perhaps it went offline due to network errors. |
| 730 | # We can't easily differentiate between the two, so we need |
| 731 | # to notify the user because if it crashed due to low memory |
| 732 | # the user might need to take action (or be stuck in an infinite loop) |
| 733 | raise NodeServerError("Processing node went offline. This could be due to insufficient memory or a network error.") |
| 734 | |
| 735 | if self.processing_node: |
| 736 | # Need to process some images (UUID not yet set and task doesn't have pending actions)? |
| 737 | if not self.uuid and self.pending_action is None and self.status is None: |
| 738 | |
| 739 | logger.info("Processing... {}".format(self)) |
| 740 |
no test coverage detected