MCPcopy
hub / github.com/bugsink/bugsink / compress_with_zlib

Function compress_with_zlib

bugsink/streams.py:194–208  ·  view source on GitHub ↗
(input_stream, wbits, chunk_size=DEFAULT_CHUNK_SIZE)

Source from the content-addressed store, hash-verified

192
193
194def compress_with_zlib(input_stream, wbits, chunk_size=DEFAULT_CHUNK_SIZE):
195 # mostly useful for testing (compress-decompress cycles)
196
197 output_stream = io.BytesIO()
198 z = zlib.compressobj(wbits=wbits)
199
200 while True:
201 uncompressed_chunk = input_stream.read(chunk_size)
202 if not uncompressed_chunk:
203 break
204
205 output_stream.write(z.compress(uncompressed_chunk))
206
207 output_stream.write(z.flush())
208 return output_stream.getvalue()
209
210
211def copy_stream_limited(input_stream, output_stream, *, max_bytes=None, reason=None, chunk_size=DEFAULT_CHUNK_SIZE,

Callers 4

send_to_serverMethod · 0.90
prepareMethod · 0.90

Calls 3

readMethod · 0.45
writeMethod · 0.45
flushMethod · 0.45

Tested by 3

prepareMethod · 0.72