(self)
| 239 | shutil.rmtree(tmpdir) |
| 240 | |
| 241 | def test_ckpt_save_failure(self): |
| 242 | num_nodes = 3 |
| 243 | # The goal of this test is to ensure that the job runs |
| 244 | # successfully even if saving a checkpoint fails. |
| 245 | # Hence tmpdir is a non existent directory to emulate a failure |
| 246 | # while saving checkpoints |
| 247 | tmpdir = "/tmp/path_does_not_exist/" |
| 248 | |
| 249 | # Check the saving checkpoint failure does not cause job failure |
| 250 | workspace.ResetWorkspace() |
| 251 | for node_id in range(num_nodes): |
| 252 | ws = workspace.C.Workspace() |
| 253 | session = LocalSession(ws) |
| 254 | checkpoint = MultiNodeCheckpointManager(tmpdir, 'minidb') |
| 255 | with Cluster(): |
| 256 | with Job() as job: |
| 257 | build_pipeline(node_id) |
| 258 | job.compile(LocalSession) |
| 259 | job_runner = JobRunner(job, checkpoint) |
| 260 | num_epochs = job_runner.train(session) |
| 261 | # make sure all epochs are executed even though saving the checkpoint failed |
| 262 | # Saving checkpoint failure should not cause job failure |
| 263 | self.assertEqual(num_epochs, len(EXPECTED_TOTALS)) |
| 264 | |
| 265 | def test_download_group_simple(self): |
| 266 | """ |
nothing calls this directly
no test coverage detected