(self, test_data)
| 42 | self._check_decompression(test_data) |
| 43 | |
| 44 | def _test_multiple_process(self, test_data): |
| 45 | # Write chunked compression to temp file. |
| 46 | temp_compressed = _test_utils.get_temp_compressed_name(test_data) |
| 47 | with open(temp_compressed, 'wb') as out_file: |
| 48 | with open(test_data, 'rb') as in_file: |
| 49 | read_chunk = functools.partial(in_file.read, self.CHUNK_SIZE) |
| 50 | for data in iter(read_chunk, b''): |
| 51 | out_file.write(self.compressor.process(data)) |
| 52 | out_file.write(self.compressor.finish()) |
| 53 | self._check_decompression(test_data) |
| 54 | |
| 55 | def _test_multiple_process_and_flush(self, test_data): |
| 56 | # Write chunked and flushed compression to temp file. |
nothing calls this directly
no test coverage detected