(self)
| 432 | ) |
| 433 | |
| 434 | def get_bytes(self): |
| 435 | buffer = io.BytesIO() |
| 436 | for chunk in self.get_bytes_stream(): |
| 437 | buffer.write(chunk) |
| 438 | data = buffer.getvalue() |
| 439 | try: |
| 440 | # 解压数据 |
| 441 | with zipfile.ZipFile(buffer) as zip_file: |
| 442 | names = [name for name in zip_file.namelist() if not name.endswith("/")] |
| 443 | if len(names) != 1: |
| 444 | return data |
| 445 | # 用 zip 内实际存储的条目名,避免文件名不匹配 |
| 446 | name = names[0] |
| 447 | return zip_file.read(name) |
| 448 | except zipfile.BadZipFile: |
| 449 | # 如果数据不是zip格式,直接返回原始数据 |
| 450 | return data |
| 451 | |
| 452 | def get_bytes_stream(self, start=0, end=None, chunk_size=64 * 1024): |
| 453 | def _read_with_offset(): |
no test coverage detected