MCPcopy
hub / github.com/mitmproxy/mitmproxy / unpack_from

Method unpack_from

mitmproxy/dns.py:378–457  ·  view source on GitHub ↗

Converts the buffer from a given offset into a DNS message and also returns its length.

(
        cls, buffer: bytes | bytearray, offset: int, timestamp: float | None = None
    )

Source from the content-addressed store, hash-verified

376
377 @classmethod
378 def unpack_from(
379 cls, buffer: bytes | bytearray, offset: int, timestamp: float | None = None
380 ) -> tuple[int, DNSMessage]:
381 """Converts the buffer from a given offset into a DNS message and also returns its length."""
382 (
383 id,
384 flags,
385 len_questions,
386 len_answers,
387 len_authorities,
388 len_additionals,
389 ) = DNSMessage.HEADER.unpack_from(buffer, offset)
390 msg = DNSMessage(
391 timestamp=timestamp,
392 id=id,
393 query=(flags & (1 << 15)) == 0,
394 op_code=(flags >> 11) & 0b1111,
395 authoritative_answer=(flags & (1 << 10)) != 0,
396 truncation=(flags & (1 << 9)) != 0,
397 recursion_desired=(flags & (1 << 8)) != 0,
398 recursion_available=(flags & (1 << 7)) != 0,
399 reserved=(flags >> 4) & 0b111,
400 response_code=flags & 0b1111,
401 questions=[],
402 answers=[],
403 authorities=[],
404 additionals=[],
405 )
406 offset += DNSMessage.HEADER.size
407 cached_names = domain_names.cache()
408
409 def unpack_domain_name() -> str:
410 nonlocal buffer, offset, cached_names
411 name, length = domain_names.unpack_from_with_compression(
412 buffer, offset, cached_names
413 )
414 offset += length
415 return name
416
417 for i in range(0, len_questions):
418 try:
419 name = unpack_domain_name()
420 type, class_ = Question.HEADER.unpack_from(buffer, offset)
421 offset += Question.HEADER.size
422 msg.questions.append(Question(name=name, type=type, class_=class_))
423 except struct.error as e:
424 raise struct.error(f"question #{i}: {e}")
425
426 def unpack_rrs(
427 section: list[ResourceRecord], section_name: str, count: int
428 ) -> None:
429 nonlocal buffer, offset
430 for i in range(0, count):
431 try:
432 name = unpack_domain_name()
433 type, class_, ttl, len_data = ResourceRecord.HEADER.unpack_from(
434 buffer, offset
435 )

Callers 8

unpackMethod · 0.80
unpack_rrsMethod · 0.80
unpack_messageMethod · 0.80
original_addrFunction · 0.80
unpackFunction · 0.80
_unpack_label_intoFunction · 0.80
unpack_fromFunction · 0.80

Calls 4

QuestionClass · 0.85
DNSMessageClass · 0.70
appendMethod · 0.45
errorMethod · 0.45

Tested by

no test coverage detected