MCPcopy Index your code
hub / github.com/secdev/scapy / dns_resolve

Function dns_resolve

scapy/layers/dns.py:1395–1490  ·  view source on GitHub ↗

Perform a simple DNS resolution using conf.nameservers with caching :param qname: the name to query :param qtype: the type to query (default A) :param raw: return the whole DNS packet (default False) :param tcp: whether to use directly TCP instead of UDP. If truncated is receiv

(qname, qtype="A", raw=False, tcp=False, verbose=1, timeout=3, **kwargs)

Source from the content-addressed store, hash-verified

1393
1394@conf.commands.register
1395def dns_resolve(qname, qtype="A", raw=False, tcp=False, verbose=1, timeout=3, **kwargs):
1396 """
1397 Perform a simple DNS resolution using conf.nameservers with caching
1398
1399 :param qname: the name to query
1400 :param qtype: the type to query (default A)
1401 :param raw: return the whole DNS packet (default False)
1402 :param tcp: whether to use directly TCP instead of UDP. If truncated is received,
1403 UDP automatically retries in TCP. (default: False)
1404 :param verbose: show verbose errors
1405 :param timeout: seconds until timeout (per server)
1406 :raise TimeoutError: if no DNS servers were reached in time.
1407 """
1408 # Unify types (for caching)
1409 qtype = DNSQR.qtype.any2i_one(None, qtype)
1410 qname = DNSQR.qname.any2i(None, qname)
1411 # Check cache
1412 cache_ident = b";".join(
1413 [qname, struct.pack("!B", qtype)] +
1414 ([b"raw"] if raw else [])
1415 )
1416 result = _dns_cache.get(cache_ident)
1417 if result:
1418 return result
1419
1420 kwargs.setdefault("timeout", timeout)
1421 kwargs.setdefault("verbose", 0) # hide sr1() output
1422 res = None
1423 for nameserver in conf.nameservers:
1424 # Try all nameservers
1425 try:
1426 # Spawn a socket, connect to the nameserver on port 53
1427 if tcp:
1428 cls = DNSTCP
1429 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1430 else:
1431 cls = DNS
1432 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
1433 sock.settimeout(kwargs["timeout"])
1434 sock.connect((nameserver, 53))
1435 # Connected. Wrap it with DNS
1436 sock = StreamSocket(sock, cls)
1437 # I/O
1438 res = sock.sr1(
1439 cls(qd=[DNSQR(qname=qname, qtype=qtype)], id=RandShort()),
1440 **kwargs,
1441 )
1442 except IOError as ex:
1443 if verbose:
1444 log_runtime.warning(str(ex))
1445 continue
1446 finally:
1447 sock.close()
1448 if res:
1449 # We have a response ! Check for failure
1450 if res[DNS].tc == 1: # truncated !
1451 if not tcp:
1452 # Retry using TCP

Callers 2

dclocatorFunction · 0.90
make_replyMethod · 0.85

Calls 10

StreamSocketClass · 0.90
RandShortClass · 0.90
DNSQRClass · 0.85
joinMethod · 0.80
any2i_oneMethod · 0.45
any2iMethod · 0.45
getMethod · 0.45
connectMethod · 0.45
sr1Method · 0.45
closeMethod · 0.45

Tested by

no test coverage detected