Converts the buffer from a given offset into a DNS message and also returns its length.
(
cls, buffer: bytes | bytearray, offset: int, timestamp: float | None = None
)
| 376 | |
| 377 | @classmethod |
| 378 | def unpack_from( |
| 379 | cls, buffer: bytes | bytearray, offset: int, timestamp: float | None = None |
| 380 | ) -> tuple[int, DNSMessage]: |
| 381 | """Converts the buffer from a given offset into a DNS message and also returns its length.""" |
| 382 | ( |
| 383 | id, |
| 384 | flags, |
| 385 | len_questions, |
| 386 | len_answers, |
| 387 | len_authorities, |
| 388 | len_additionals, |
| 389 | ) = DNSMessage.HEADER.unpack_from(buffer, offset) |
| 390 | msg = DNSMessage( |
| 391 | timestamp=timestamp, |
| 392 | id=id, |
| 393 | query=(flags & (1 << 15)) == 0, |
| 394 | op_code=(flags >> 11) & 0b1111, |
| 395 | authoritative_answer=(flags & (1 << 10)) != 0, |
| 396 | truncation=(flags & (1 << 9)) != 0, |
| 397 | recursion_desired=(flags & (1 << 8)) != 0, |
| 398 | recursion_available=(flags & (1 << 7)) != 0, |
| 399 | reserved=(flags >> 4) & 0b111, |
| 400 | response_code=flags & 0b1111, |
| 401 | questions=[], |
| 402 | answers=[], |
| 403 | authorities=[], |
| 404 | additionals=[], |
| 405 | ) |
| 406 | offset += DNSMessage.HEADER.size |
| 407 | cached_names = domain_names.cache() |
| 408 | |
| 409 | def unpack_domain_name() -> str: |
| 410 | nonlocal buffer, offset, cached_names |
| 411 | name, length = domain_names.unpack_from_with_compression( |
| 412 | buffer, offset, cached_names |
| 413 | ) |
| 414 | offset += length |
| 415 | return name |
| 416 | |
| 417 | for i in range(0, len_questions): |
| 418 | try: |
| 419 | name = unpack_domain_name() |
| 420 | type, class_ = Question.HEADER.unpack_from(buffer, offset) |
| 421 | offset += Question.HEADER.size |
| 422 | msg.questions.append(Question(name=name, type=type, class_=class_)) |
| 423 | except struct.error as e: |
| 424 | raise struct.error(f"question #{i}: {e}") |
| 425 | |
| 426 | def unpack_rrs( |
| 427 | section: list[ResourceRecord], section_name: str, count: int |
| 428 | ) -> None: |
| 429 | nonlocal buffer, offset |
| 430 | for i in range(0, count): |
| 431 | try: |
| 432 | name = unpack_domain_name() |
| 433 | type, class_, ttl, len_data = ResourceRecord.HEADER.unpack_from( |
| 434 | buffer, offset |
| 435 | ) |
no test coverage detected