| 110 | ) |
| 111 | |
| 112 | def unpack_message(self, data: bytes, from_client: bool) -> List[dns.DNSMessage]: |
| 113 | msgs: List[dns.DNSMessage] = [] |
| 114 | |
| 115 | buf = self.req_buf if from_client else self.resp_buf |
| 116 | |
| 117 | if self.context.client.transport_protocol == "udp": |
| 118 | msgs.append(dns.DNSMessage.unpack(data, timestamp=time.time())) |
| 119 | elif self.context.client.transport_protocol == "tcp": |
| 120 | buf.extend(data) |
| 121 | size = len(buf) |
| 122 | offset = 0 |
| 123 | |
| 124 | while True: |
| 125 | if size - offset < _LENGTH_LABEL.size: |
| 126 | break |
| 127 | (expected_size,) = _LENGTH_LABEL.unpack_from(buf, offset) |
| 128 | offset += _LENGTH_LABEL.size |
| 129 | if expected_size == 0: |
| 130 | raise struct.error("Message length field cannot be zero") |
| 131 | |
| 132 | if size - offset < expected_size: |
| 133 | offset -= _LENGTH_LABEL.size |
| 134 | break |
| 135 | |
| 136 | data = bytes(buf[offset : expected_size + offset]) |
| 137 | offset += expected_size |
| 138 | msgs.append(dns.DNSMessage.unpack(data, timestamp=time.time())) |
| 139 | |
| 140 | del buf[:offset] |
| 141 | return msgs |
| 142 | |
| 143 | @expect(events.Start) |
| 144 | def state_start(self, _) -> layer.CommandGenerator[None]: |