MCPcopy Index your code
hub / github.com/InternLM/InternLM / wait

Method wait

internlm/utils/storage_manager.py:566–602  ·  view source on GitHub ↗

Wait for async operations to complete.

(self)

Source from the content-addressed store, hash-verified

564 self._async_stack.append(t)
565
566 def wait(self) -> bool:
567 """Wait for async operations to complete."""
568
569 if not self.async_mode:
570 return
571
572 if not self.async_task_peeding:
573 return
574
575 if self._async_loop:
576 self._async_loop.run_until_complete(self._sync_tasks())
577
578 if self._exception_list:
579 for error_msg, file_id in self._exception_list:
580 logger.error(
581 f"Node:{socket.gethostname()}, Error: Checkpoint {self._to_be_del_files[file_id]} "
582 f"failed on step {self.upload_count}: {error_msg}"
583 )
584
585 # TODO: Re-upload in sync mode
586 raise RuntimeError(
587 f"Failed to upload {self._to_be_del_files[file_id]} " f"on step {self.upload_count}: {error_msg}"
588 )
589
590 self._del_tmp_folder()
591 self._exception_list.clear()
592 self._to_be_del_files.clear()
593 self.async_task_peeding = False
594
595 if gpc.is_rank_for_log():
596 self.upload_count += 1
597 if self.async_mode:
598 self.save(
599 os.path.join(self.latest_save_folder, f"{self.latest_save_step}.step"),
600 saved_obj=dict({"step": self.latest_save_step}),
601 async_upload=False,
602 )
603
604
605storage_manager: StorageManager = None

Callers 11

loadMethod · 0.95
try_save_checkpointMethod · 0.80
_sync_tasksMethod · 0.80
wait_async_upload_finishFunction · 0.80
_communicateFunction · 0.80
backwardMethod · 0.80
_pre_forward_hookMethod · 0.80
broadcast_paramsMethod · 0.80

Calls 4

_sync_tasksMethod · 0.95
_del_tmp_folderMethod · 0.95
saveMethod · 0.95
is_rank_for_logMethod · 0.80

Tested by

no test coverage detected