| 155 | super(TestBagFile, self).__init__(io.BytesIO(), *args, **kwargs) |
| 156 | |
| 157 | def test_write_read_integrity(self): |
| 158 | for reader_t in [Rosbag1Reader, Rosbag2Reader]: |
| 159 | # TODO: rosbags cannot overwrite existing paths, this forces us |
| 160 | # to do this here to get only a filepath: |
| 161 | tmp_filename = tempfile.NamedTemporaryFile(delete=True).name |
| 162 | if reader_t is Rosbag1Reader: |
| 163 | bag_out = Rosbag1Writer(tmp_filename) |
| 164 | else: |
| 165 | bag_out = Rosbag2Writer( |
| 166 | tmp_filename, version=SETTINGS.ros2_bag_format_version |
| 167 | ) |
| 168 | bag_out.open() |
| 169 | traj_out = helpers.fake_trajectory(1000, 0.1) |
| 170 | self.assertTrue(traj_out.check()) |
| 171 | file_interface.write_bag_trajectory( |
| 172 | bag_out, traj_out, "/test", frame_id="map" |
| 173 | ) |
| 174 | bag_out.close() |
| 175 | bag_in = reader_t(tmp_filename) |
| 176 | bag_in.open() |
| 177 | traj_in = file_interface.read_bag_trajectory(bag_in, "/test") |
| 178 | self.assertIsInstance(traj_in, PoseTrajectory3D) |
| 179 | self.assertTrue(traj_in.check()) |
| 180 | self.assertTrue(traj_out == traj_in) |
| 181 | self.assertEqual(traj_in.meta["frame_id"], "map") |
| 182 | |
| 183 | |
| 184 | class TestResultFile(MockFileTestCase): |