| 5 | |
| 6 | |
| 7 | def ls_url(url, *, fs_config=None, recursive=False, maxdepth=None, config=None): |
| 8 | fs, fs_path = parse_external_url(url, fs_config=fs_config, config=config) |
| 9 | try: |
| 10 | info = fs.info(fs_path) |
| 11 | except FileNotFoundError as exc: |
| 12 | raise URLMissingError(url) from exc |
| 13 | if maxdepth == 0 or info["type"] != "directory": |
| 14 | return [{"path": info["name"], "isdir": False}] |
| 15 | |
| 16 | if isinstance(fs, LocalFileSystem): |
| 17 | # dvc's LocalFileSystem does not support maxdepth yet |
| 18 | walk = _LocalFileSystem().walk |
| 19 | else: |
| 20 | walk = fs.walk |
| 21 | |
| 22 | ret = [] |
| 23 | for root, dirs, files in walk(fs_path, detail=True, maxdepth=maxdepth): |
| 24 | parts = fs.relparts(root, fs_path) |
| 25 | if parts == (".",): |
| 26 | parts = () |
| 27 | if not recursive or (maxdepth and len(parts) >= maxdepth - 1): |
| 28 | files.update(dirs) |
| 29 | |
| 30 | for info in files.values(): |
| 31 | ls_info = { |
| 32 | "path": fs.relpath(info["name"], fs_path), |
| 33 | "isdir": info["type"] == "directory", |
| 34 | "size": info.get("size"), |
| 35 | } |
| 36 | ret.append(ls_info) |
| 37 | |
| 38 | if not recursive: |
| 39 | break |
| 40 | |
| 41 | return ret |