MCPcopy
hub / github.com/Textualize/toolong / open_compressed

Method open_compressed

src/toolong/log_file.py:93–126  ·  view source on GitHub ↗
(self, exit_event: Event, encoding: str)

Source from the content-addressed store, hash-verified

91 return True
92
93 def open_compressed(self, exit_event: Event, encoding: str) -> bool:
94 from tempfile import TemporaryFile
95
96 chunk_size = 1024 * 256
97
98 temp_file = TemporaryFile("wb+")
99
100 compressed_file: IO[bytes]
101 if encoding == "gzip":
102 import gzip
103
104 compressed_file = gzip.open(self.path, "rb")
105 elif encoding == "bzip2":
106 import bz2
107
108 compressed_file = bz2.open(self.path, "rb")
109 else:
110 # Shouldn't get here
111 raise AssertionError("Not supported")
112
113 try:
114 while data := compressed_file.read(chunk_size):
115 temp_file.write(data)
116 if exit_event.is_set():
117 temp_file.close()
118 return False
119 finally:
120 compressed_file.close()
121
122 temp_file.flush()
123 self.file = temp_file
124 self.size = temp_file.tell()
125 self.can_tail = False
126 return True
127
128 def close(self) -> None:
129 if self.file is not None:

Callers 1

openMethod · 0.95

Calls 2

openMethod · 0.80
closeMethod · 0.45

Tested by

no test coverage detected