(
path,
repo=None,
rev=None,
remote=None,
mode="r",
encoding=None,
config=None,
remote_config=None,
)
| 256 | |
| 257 | |
| 258 | def _open( |
| 259 | path, |
| 260 | repo=None, |
| 261 | rev=None, |
| 262 | remote=None, |
| 263 | mode="r", |
| 264 | encoding=None, |
| 265 | config=None, |
| 266 | remote_config=None, |
| 267 | ): |
| 268 | repo_kwargs: dict[str, Any] = { |
| 269 | "subrepos": True, |
| 270 | "uninitialized": True, |
| 271 | "remote": remote, |
| 272 | "config": config, |
| 273 | "remote_config": remote_config, |
| 274 | } |
| 275 | |
| 276 | with Repo.open(repo, rev=rev, **repo_kwargs) as _repo: |
| 277 | with _wrap_exceptions(_repo, path): |
| 278 | import os |
| 279 | from typing import TYPE_CHECKING, Union |
| 280 | |
| 281 | from dvc.exceptions import IsADirectoryError as DvcIsADirectoryError |
| 282 | from dvc.fs.data import DataFileSystem |
| 283 | from dvc.fs.dvc import DVCFileSystem |
| 284 | |
| 285 | if TYPE_CHECKING: |
| 286 | from dvc.fs import FileSystem |
| 287 | |
| 288 | fs: Union[FileSystem, DataFileSystem, DVCFileSystem] |
| 289 | if os.path.isabs(path): |
| 290 | fs = DataFileSystem(index=_repo.index.data["local"]) |
| 291 | fs_path = path |
| 292 | else: |
| 293 | fs = DVCFileSystem(repo=_repo, subrepos=True) |
| 294 | fs_path = fs.from_os_path(path) |
| 295 | |
| 296 | try: |
| 297 | with fs.open(fs_path, mode=mode, encoding=encoding) as fobj: |
| 298 | yield fobj |
| 299 | except FileNotFoundError as exc: |
| 300 | raise FileMissingError(path) from exc |
| 301 | except IsADirectoryError as exc: |
| 302 | raise DvcIsADirectoryError(f"'{path}' is a directory") from exc |
| 303 | |
| 304 | |
| 305 | def read( |
nothing calls this directly
no test coverage detected