(self,
path, # Path to directory or zip.
cfg: DictConfig, # Config
resolution=None, # Unused arg for backward compatibility
load_n_consecutive: int=None, # Should we load first N frames for each video?
load_n_consecutive_random_offset: bool=True, # Should we use a random offset when loading consecutive frames?
subsample_factor: int=1, # Sampling factor, i.e. decreasing the temporal resolution
discard_short_videos: bool=False, # Should we discard videos that are shorter than `load_n_consecutive`?
**super_kwargs, # Additional arguments for the Dataset base class.
)
| 259 | |
| 260 | class VideoFramesFolderDataset(Dataset): |
| 261 | def __init__(self, |
| 262 | path, # Path to directory or zip. |
| 263 | cfg: DictConfig, # Config |
| 264 | resolution=None, # Unused arg for backward compatibility |
| 265 | load_n_consecutive: int=None, # Should we load first N frames for each video? |
| 266 | load_n_consecutive_random_offset: bool=True, # Should we use a random offset when loading consecutive frames? |
| 267 | subsample_factor: int=1, # Sampling factor, i.e. decreasing the temporal resolution |
| 268 | discard_short_videos: bool=False, # Should we discard videos that are shorter than `load_n_consecutive`? |
| 269 | **super_kwargs, # Additional arguments for the Dataset base class. |
| 270 | ): |
| 271 | self.sampling_dict = OmegaConf.to_container(OmegaConf.create({**cfg.sampling})) if 'sampling' in cfg else None |
| 272 | self.max_num_frames = cfg.max_num_frames |
| 273 | self._path = path |
| 274 | self._zipfile = None |
| 275 | self.load_n_consecutive = load_n_consecutive |
| 276 | self.load_n_consecutive_random_offset = load_n_consecutive_random_offset |
| 277 | self.subsample_factor = subsample_factor |
| 278 | print(subsample_factor) |
| 279 | self.discard_short_videos = discard_short_videos |
| 280 | |
| 281 | if self.subsample_factor > 1 and self.load_n_consecutive is None: |
| 282 | raise NotImplementedError("Can do subsampling only when loading consecutive frames.") |
| 283 | |
| 284 | listdir_full_paths = lambda d: sorted([os.path.join(d, x) for x in os.listdir(d)]) |
| 285 | name = os.path.splitext(os.path.basename(self._path))[0] |
| 286 | |
| 287 | if os.path.isdir(self._path): |
| 288 | self._type = 'dir' |
| 289 | # We assume that the depth is 2 |
| 290 | self._all_objects = {o for d in listdir_full_paths(self._path) for o in (([d] + listdir_full_paths(d)) if os.path.isdir(d) else [d])} |
| 291 | self._all_objects = {os.path.relpath(o, start=os.path.dirname(self._path)) for o in {self._path}.union(self._all_objects)} |
| 292 | elif self._file_ext(self._path) == '.zip': |
| 293 | self._type = 'zip' |
| 294 | self._all_objects = set(self._get_zipfile().namelist()) |
| 295 | else: |
| 296 | raise IOError('Path must be either a directory or point to a zip archive') |
| 297 | |
| 298 | PIL.Image.init() |
| 299 | self._video_dir2frames = {} |
| 300 | objects = sorted([d for d in self._all_objects]) |
| 301 | root_path_depth = len(os.path.normpath(objects[0]).split(os.path.sep)) |
| 302 | curr_d = objects[1] # Root path is the first element |
| 303 | |
| 304 | for o in objects[1:]: |
| 305 | curr_obj_depth = len(os.path.normpath(o).split(os.path.sep)) |
| 306 | |
| 307 | if self._file_ext(o) in PIL.Image.EXTENSION: |
| 308 | assert o.startswith(curr_d), f"Object {o} is out of sync. It should lie inside {curr_d}" |
| 309 | assert curr_obj_depth == root_path_depth + 2, "Frame images should be inside directories" |
| 310 | if not curr_d in self._video_dir2frames: |
| 311 | self._video_dir2frames[curr_d] = [] |
| 312 | self._video_dir2frames[curr_d].append(o) |
| 313 | elif self._file_ext(o) == 'json': |
| 314 | assert curr_obj_depth == root_path_depth + 1, "Classes info file should be inside the root dir" |
| 315 | pass |
| 316 | else: |
| 317 | # We encountered a new directory |
| 318 | assert curr_obj_depth == root_path_depth + 1, f"Video directories should be inside the root dir. {o} is not." |
nothing calls this directly
no test coverage detected