| 164 | return data[0] |
| 165 | |
| 166 | def get_multi_range(self, ranges: list[tuple[int, int]]) -> list[bytes]: |
| 167 | # HTTP range requests are inclusive |
| 168 | assert all(e > s for s, e in ranges), "Range end must be greater than start" |
| 169 | rs = [f"{s}-{e-1}" for s, e in ranges if e > s] |
| 170 | |
| 171 | r = self._request("GET", self._url, headers={"Range": "bytes=" + ",".join(rs)}) |
| 172 | if r.status not in [200, 206]: |
| 173 | raise URLFileException(f"Expected 206 or 200 response {r.status} ({self._url})") |
| 174 | |
| 175 | ctype = (r.headers.get("content-type") or "").lower() |
| 176 | if "multipart/byteranges" not in ctype: |
| 177 | return [r.data,] |
| 178 | |
| 179 | m = re.search(r'boundary="?([^";]+)"?', ctype) |
| 180 | if not m: |
| 181 | raise URLFileException(f"Missing multipart boundary ({self._url})") |
| 182 | boundary = m.group(1).encode() |
| 183 | |
| 184 | parts = [] |
| 185 | for chunk in r.data.split(b"--" + boundary): |
| 186 | if b"\r\n\r\n" not in chunk: |
| 187 | continue |
| 188 | payload = chunk.split(b"\r\n\r\n", 1)[1].rstrip(b"\r\n") |
| 189 | if payload and payload != b"--": |
| 190 | parts.append(payload) |
| 191 | if len(parts) != len(ranges): |
| 192 | raise URLFileException(f"Expected {len(ranges)} parts, got {len(parts)} ({self._url})") |
| 193 | return parts |
| 194 | |
| 195 | def seekable(self) -> bool: |
| 196 | return True |