MCPcopy
hub / github.com/fortra/impacket / parseCompressedMessage

Method parseCompressedMessage

impacket/dns.py:317–344  ·  view source on GitHub ↗

Parse compressed message defined on rfc1035 4.1.4.

(self, buf, offset=0)

Source from the content-addressed store, hash-verified

315 return (aux, offset)
316
317 def parseCompressedMessage(self, buf, offset=0):
318 'Parse compressed message defined on rfc1035 4.1.4.'
319 if offset >= len(buf):
320 raise Exception("No more data to parse. Offset is bigger than length of buffer.")
321 byte = struct.unpack("B", buf[offset:offset+1])[0]
322 # if the first two bits are ones (11000000=0xC0), the next bits are the offset
323 if byte & 0xC0 == 0xC0:
324 # It's a pointer
325 pointer = struct.unpack("!H", buf[offset:offset+2])[0] # network unsigned short
326 pointer = (pointer & 0x3FFF) - self.__HEADER_BASE_SIZE
327 if offset == pointer:
328 raise Exception("The infinite loop is in DNS decompression. Encountered pointer points to the current offset.")
329 offset += 2
330 name = self.parseCompressedMessage(buf, pointer)[1]
331 return (offset, name)
332 else:
333 # It's a label
334 if byte == 0x00:
335 offset += 1
336 return (offset, '')
337 offset += 1
338 name = buf[offset:offset+byte]
339 offset += byte
340 offset, unnamed = self.parseCompressedMessage(buf, offset)
341 if not unnamed:
342 return (offset, name)
343 else:
344 return (offset, name + b"." + unnamed)
345
346 def get_answers(self):
347 return self.__get_answers()[0]

Callers 3

__get_questionsMethod · 0.95
__get_questions_tcpMethod · 0.95

Calls 1

unpackMethod · 0.45

Tested by

no test coverage detected