Attempts to decompress a provided file and write the data to a temporary file. The list of created temporary files is returned.
(filepath, extension, unzipdir)
| 103 | |
| 104 | |
| 105 | def decompress_file(filepath, extension, unzipdir): |
| 106 | """ |
| 107 | Attempts to decompress a provided file and write the data to a temporary |
| 108 | file. The list of created temporary files is returned. |
| 109 | """ |
| 110 | filename = os.path.split(filepath)[-1] |
| 111 | openfiles = [] |
| 112 | logger.debug("Attempting to decompress {!r}".format(filepath)) |
| 113 | if extension == '.gz': |
| 114 | f = gzip.open(filepath, 'rb') |
| 115 | openfiles.append(f) |
| 116 | elif extension == '.bz2': |
| 117 | f = bz2.open(filepath, 'rb') |
| 118 | openfiles.append(f) |
| 119 | elif extension == '.zip': |
| 120 | pswd = getpass("Enter password for .zip file {!r} [default: none]: ".format(filepath)) |
| 121 | pswd = pswd.encode() # TODO I'm not sure encoding to utf-8 will work in all cases |
| 122 | try: |
| 123 | z = zipfile.ZipFile(filepath) |
| 124 | for z2 in z.namelist(): |
| 125 | f = z.open(z2, 'r', pswd) |
| 126 | openfiles.append(f) |
| 127 | except (RuntimeError, zipfile.BadZipFile) as e: |
| 128 | logger.error("Could not process .zip file {!r}. {!s}".format(filepath, e)) |
| 129 | return [] |
| 130 | |
| 131 | tempfiles = [] |
| 132 | for openfile in openfiles: |
| 133 | with openfile: |
| 134 | try: |
| 135 | # check if this file is actually something decompressable |
| 136 | openfile.peek(1) |
| 137 | except OSError as e: |
| 138 | logger.error("Could not process compressed file {!r}. {!s}".format(filepath, e)) |
| 139 | continue |
| 140 | with tempfile.NamedTemporaryFile(dir=unzipdir, delete=False, prefix=filename) as tfile: |
| 141 | for piece in openfile: |
| 142 | tfile.write(piece) |
| 143 | tempfiles.append(tfile.name) |
| 144 | return tempfiles |
| 145 | |
| 146 | |
| 147 | def print_plugins(plugins): |
no test coverage detected