MCPcopy Index your code
hub / github.com/HelloZeroNet/ZeroNet / ZipStream

Class ZipStream

plugins/Sidebar/ZipStream.py:5–47  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

3import zipfile
4
5class ZipStream(object):
6 def __init__(self, dir_path):
7 self.dir_path = dir_path
8 self.pos = 0
9 self.buff_pos = 0
10 self.zf = zipfile.ZipFile(self, 'w', zipfile.ZIP_DEFLATED, allowZip64=True)
11 self.buff = io.BytesIO()
12 self.file_list = self.getFileList()
13
14 def getFileList(self):
15 for root, dirs, files in os.walk(self.dir_path):
16 for file in files:
17 file_path = root + "/" + file
18 relative_path = os.path.join(os.path.relpath(root, self.dir_path), file)
19 yield file_path, relative_path
20 self.zf.close()
21
22 def read(self, size=60 * 1024):
23 for file_path, relative_path in self.file_list:
24 self.zf.write(file_path, relative_path)
25 if self.buff.tell() >= size:
26 break
27 self.buff.seek(0)
28 back = self.buff.read()
29 self.buff.truncate(0)
30 self.buff.seek(0)
31 self.buff_pos += len(back)
32 return back
33
34 def write(self, data):
35 self.pos += len(data)
36 self.buff.write(data)
37
38 def tell(self):
39 return self.pos
40
41 def seek(self, pos, whence=0):
42 if pos >= self.buff_pos:
43 self.buff.seek(pos - self.buff_pos, whence)
44 self.pos = pos
45
46 def flush(self):
47 pass
48
49
50if __name__ == "__main__":

Callers 2

streamZipMethod · 0.90
ZipStream.pyFile · 0.85

Calls

no outgoing calls

Tested by

no test coverage detected