(archive_path, file_obj=None)
| 17 | |
| 18 | |
| 19 | def openArchive(archive_path, file_obj=None): |
| 20 | if archive_path not in archive_cache: |
| 21 | if archive_path.endswith("tar.gz"): |
| 22 | import tarfile |
| 23 | archive_cache[archive_path] = tarfile.open(archive_path, fileobj=file_obj, mode="r:gz") |
| 24 | elif archive_path.endswith("tar.bz2"): |
| 25 | import tarfile |
| 26 | archive_cache[archive_path] = tarfile.open(archive_path, fileobj=file_obj, mode="r:bz2") |
| 27 | else: |
| 28 | import zipfile |
| 29 | archive_cache[archive_path] = zipfile.ZipFile(file_obj or archive_path) |
| 30 | gevent.spawn_later(5, lambda: closeArchive(archive_path)) # Close after 5 sec |
| 31 | |
| 32 | archive = archive_cache[archive_path] |
| 33 | return archive |
| 34 | |
| 35 | |
| 36 | def openArchiveFile(archive_path, path_within, file_obj=None): |
no test coverage detected