MCPcopy Index your code
hub / github.com/AutoRecon/AutoRecon / _read

Method _read

autorecon/io.py:114–162  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

112
113 # Read lines from the stream until it ends.
114 async def _read(self):
115 while True:
116 if self.stream.at_eof():
117 break
118 try:
119 line = (await self.stream.readline()).decode('utf8').rstrip()
120 except ValueError:
121 error('{bright}[{yellow}' + self.target.address + '{crst}/{bgreen}' + self.tag + '{crst}]{rst} A line was longer than 64 KiB and cannot be processed. Ignoring.')
122 continue
123
124 if line != '':
125 info('{bright}[{yellow}' + self.target.address + '{crst}/{bgreen}' + self.tag + '{crst}]{rst} ' + line.strip().replace('{', '{{').replace('}', '}}'), verbosity=3)
126
127 # Check lines for pattern matches.
128 for p in self.patterns:
129 description = ''
130
131 # Match and replace entire pattern.
132 match = p.pattern.search(line)
133 if match:
134 if p.description:
135 description = p.description.replace('{match}', line[match.start():match.end()])
136
137 # Match and replace substrings.
138 matches = p.pattern.findall(line)
139 if len(matches) > 0 and isinstance(matches[0], tuple):
140 matches = list(matches[0])
141
142 match_count = 1
143 for match in matches:
144 if p.description:
145 description = description.replace('{match' + str(match_count) + '}', match)
146 match_count += 1
147
148 async with self.target.lock:
149 with open(os.path.join(self.target.scandir, '_patterns.log'), 'a') as file:
150 info('{bright}[{yellow}' + self.target.address + '{crst}/{bgreen}' + self.tag + '{crst}]{rst} {bmagenta}' + description + '{rst}', verbosity=2)
151 file.writelines(description + '\n\n')
152 else:
153 info('{bright}[{yellow}' + self.target.address + '{crst}/{bgreen}' + self.tag + '{crst}]{rst} {bmagenta}Matched Pattern: ' + line[match.start():match.end()] + '{rst}', verbosity=2)
154 async with self.target.lock:
155 with open(os.path.join(self.target.scandir, '_patterns.log'), 'a') as file:
156 file.writelines('Matched Pattern: ' + line[match.start():match.end()] + '\n\n')
157
158 if self.outfile is not None:
159 with open(self.outfile, 'a') as writer:
160 writer.write(line + '\n')
161 self.lines.append(line)
162 self.ended = True
163
164 # Read a line from the stream cache.
165 async def readline(self):

Callers 1

executeMethod · 0.95

Calls 3

errorFunction · 0.85
infoFunction · 0.85
readlineMethod · 0.80

Tested by

no test coverage detected