Extend `open` function to support remote files using `fsspec`. It also has a retry mechanism in case connection fails. The `args` and `kwargs` are passed to `fsspec.open`, except `token` which is used for queries to private repos on huggingface.co Args: file (`str`): Path name
(file: str, mode="r", *args, download_config: Optional[DownloadConfig] = None, **kwargs)
| 943 | |
| 944 | |
| 945 | def xopen(file: str, mode="r", *args, download_config: Optional[DownloadConfig] = None, **kwargs): |
| 946 | """Extend `open` function to support remote files using `fsspec`. |
| 947 | |
| 948 | It also has a retry mechanism in case connection fails. |
| 949 | The `args` and `kwargs` are passed to `fsspec.open`, except `token` which is used for queries to private repos on huggingface.co |
| 950 | |
| 951 | Args: |
| 952 | file (`str`): Path name of the file to be opened. |
| 953 | mode (`str`, *optional*, default "r"): Mode in which the file is opened. |
| 954 | *args: Arguments to be passed to `fsspec.open`. |
| 955 | download_config : mainly use token or storage_options to support different platforms and auth types. |
| 956 | **kwargs: Keyword arguments to be passed to `fsspec.open`. |
| 957 | |
| 958 | Returns: |
| 959 | file object |
| 960 | """ |
| 961 | # This works as well for `xopen(str(Path(...)))` |
| 962 | file_str = _as_str(file) |
| 963 | main_hop, *rest_hops = file_str.split("::") |
| 964 | if is_local_path(main_hop): |
| 965 | # ignore fsspec-specific kwargs |
| 966 | kwargs.pop("block_size", None) |
| 967 | return open(main_hop, mode, *args, **kwargs) |
| 968 | # add headers and cookies for authentication on the HF Hub and for Google Drive |
| 969 | file, storage_options = _prepare_path_and_storage_options(file_str, download_config=download_config) |
| 970 | kwargs = {**kwargs, **(storage_options or {})} |
| 971 | |
| 972 | max_retries = config.STREAMING_OPEN_MAX_RETRIES |
| 973 | |
| 974 | disconnect_err = None |
| 975 | for retry in range(1, max_retries + 1): |
| 976 | try: |
| 977 | fs, fs_token, paths = fsspec.get_fs_token_paths( |
| 978 | file, |
| 979 | mode, |
| 980 | storage_options=kwargs, |
| 981 | ) |
| 982 | file_obj = fs.open(paths[0], mode) |
| 983 | if hasattr(fs, "of") and hasattr(fs.of, "__exit__"): |
| 984 | file_obj._fs = fs # keep a reference or the fs might close the file on gc |
| 985 | break |
| 986 | except CONNECTION_ERRORS_TO_RETRY as err: |
| 987 | disconnect_err = err |
| 988 | logger.warning( |
| 989 | f"Failed to connect to remote data host. Retrying in {config.STREAMING_OPEN_RETRY_INTERVAL}sec [{retry}/{max_retries}]" |
| 990 | ) |
| 991 | time.sleep(config.STREAMING_OPEN_RETRY_INTERVAL) |
| 992 | except ValueError as e: |
| 993 | if str(e) == "Cannot seek streaming HTTP file": |
| 994 | raise NonStreamableDatasetError( |
| 995 | "Streaming is not possible for this dataset because data host server doesn't support HTTP range " |
| 996 | "requests. You can still load this dataset in non-streaming mode by passing `streaming=False` (default)" |
| 997 | ) from e |
| 998 | else: |
| 999 | raise |
| 1000 | except FileNotFoundError: |
| 1001 | if file.startswith(config.HF_ENDPOINT): |
| 1002 | raise FileNotFoundError( |