Wait for async operations to complete.
(self)
| 564 | self._async_stack.append(t) |
| 565 | |
| 566 | def wait(self) -> bool: |
| 567 | """Wait for async operations to complete.""" |
| 568 | |
| 569 | if not self.async_mode: |
| 570 | return |
| 571 | |
| 572 | if not self.async_task_peeding: |
| 573 | return |
| 574 | |
| 575 | if self._async_loop: |
| 576 | self._async_loop.run_until_complete(self._sync_tasks()) |
| 577 | |
| 578 | if self._exception_list: |
| 579 | for error_msg, file_id in self._exception_list: |
| 580 | logger.error( |
| 581 | f"Node:{socket.gethostname()}, Error: Checkpoint {self._to_be_del_files[file_id]} " |
| 582 | f"failed on step {self.upload_count}: {error_msg}" |
| 583 | ) |
| 584 | |
| 585 | # TODO: Re-upload in sync mode |
| 586 | raise RuntimeError( |
| 587 | f"Failed to upload {self._to_be_del_files[file_id]} " f"on step {self.upload_count}: {error_msg}" |
| 588 | ) |
| 589 | |
| 590 | self._del_tmp_folder() |
| 591 | self._exception_list.clear() |
| 592 | self._to_be_del_files.clear() |
| 593 | self.async_task_peeding = False |
| 594 | |
| 595 | if gpc.is_rank_for_log(): |
| 596 | self.upload_count += 1 |
| 597 | if self.async_mode: |
| 598 | self.save( |
| 599 | os.path.join(self.latest_save_folder, f"{self.latest_save_step}.step"), |
| 600 | saved_obj=dict({"step": self.latest_save_step}), |
| 601 | async_upload=False, |
| 602 | ) |
| 603 | |
| 604 | |
| 605 | storage_manager: StorageManager = None |
no test coverage detected