Loads the necessary blobs from the checkpoints to the current node. Args: blob_names: A list of strings. Each string is the name of a blob. epoch: An integer. The checkpoint epoch to load from. session: A Session object to execute the Load
(self, nodes, blob_names, epoch, session)
| 495 | path_type=path_type) |
| 496 | |
| 497 | def load_blobs_locally(self, nodes, blob_names, epoch, session): |
| 498 | """Loads the necessary blobs from the checkpoints to the current node. |
| 499 | |
| 500 | Args: |
| 501 | blob_names: A list of strings. Each string is the name of a |
| 502 | blob. |
| 503 | epoch: An integer. The checkpoint epoch to load from. |
| 504 | session: A Session object to execute the Load ops. |
| 505 | """ |
| 506 | if self._node_managers is not None: |
| 507 | assert [node for node, _ in self._node_managers] == nodes |
| 508 | else: |
| 509 | self._node_managers = [] |
| 510 | for node in nodes: |
| 511 | with Node(node): |
| 512 | manager = CheckpointManager( |
| 513 | db_prefix=self._db_prefix, |
| 514 | node_name=str(node), |
| 515 | db_type=self._db_type) |
| 516 | self._node_managers.append((node, manager)) |
| 517 | assert self._node_managers is not None, 'must initialize node managers' |
| 518 | for _, manager in self._node_managers: |
| 519 | existence_task = manager.check_db_exists(epoch) |
| 520 | session.run(existence_task) |
| 521 | existence = existence_task.outputs()[0].fetch() |
| 522 | if not existence: |
| 523 | logger.info('DB %s does not exist!' % |
| 524 | db_name(epoch, manager._node_name, manager._db_prefix)) |
| 525 | return False |
| 526 | load_task = manager.load_blobs_from_checkpoint(blob_names, epoch) |
| 527 | session.run(load_task) |
| 528 | logger.info('Successfully loaded from checkpoints.') |
| 529 | return True |
| 530 | |
| 531 | def get_ckpt_db_name(self, node_name, epoch): |
| 532 | """Returns the DB name of the given node and the given epoch. |
no test coverage detected