MCPcopy
hub / github.com/mitmproxy/mitmproxy / unpack_message

Method unpack_message

mitmproxy/proxy/layers/dns.py:112–141  ·  view source on GitHub ↗
(self, data: bytes, from_client: bool)

Source from the content-addressed store, hash-verified

110 )
111
112 def unpack_message(self, data: bytes, from_client: bool) -> List[dns.DNSMessage]:
113 msgs: List[dns.DNSMessage] = []
114
115 buf = self.req_buf if from_client else self.resp_buf
116
117 if self.context.client.transport_protocol == "udp":
118 msgs.append(dns.DNSMessage.unpack(data, timestamp=time.time()))
119 elif self.context.client.transport_protocol == "tcp":
120 buf.extend(data)
121 size = len(buf)
122 offset = 0
123
124 while True:
125 if size - offset < _LENGTH_LABEL.size:
126 break
127 (expected_size,) = _LENGTH_LABEL.unpack_from(buf, offset)
128 offset += _LENGTH_LABEL.size
129 if expected_size == 0:
130 raise struct.error("Message length field cannot be zero")
131
132 if size - offset < expected_size:
133 offset -= _LENGTH_LABEL.size
134 break
135
136 data = bytes(buf[offset : expected_size + offset])
137 offset += expected_size
138 msgs.append(dns.DNSMessage.unpack(data, timestamp=time.time()))
139
140 del buf[:offset]
141 return msgs
142
143 @expect(events.Start)
144 def state_start(self, _) -> layer.CommandGenerator[None]:

Callers 3

state_queryMethod · 0.95

Calls 5

unpackMethod · 0.80
unpack_fromMethod · 0.80
appendMethod · 0.45
extendMethod · 0.45
errorMethod · 0.45

Tested by 2