MCPcopy Index your code
hub / github.com/mitmproxy/mitmproxy / tdnsflow

Function tdnsflow

mitmproxy/test/tflow.py:123–157  ·  view source on GitHub ↗

Create a DNS flow for testing.

(
    *,
    client_conn: connection.Client | None = None,
    server_conn: connection.Server | None = None,
    req: dns.DNSMessage | None = None,
    resp: bool | dns.DNSMessage = False,
    err: bool | flow.Error = False,
    live: bool = True,
)

Source from the content-addressed store, hash-verified

121
122
123def tdnsflow(
124 *,
125 client_conn: connection.Client | None = None,
126 server_conn: connection.Server | None = None,
127 req: dns.DNSMessage | None = None,
128 resp: bool | dns.DNSMessage = False,
129 err: bool | flow.Error = False,
130 live: bool = True,
131) -> dns.DNSFlow:
132 """Create a DNS flow for testing."""
133 if client_conn is None:
134 client_conn = tclient_conn()
135 client_conn.proxy_mode = ProxyMode.parse("dns")
136 client_conn.transport_protocol = "udp"
137 if server_conn is None:
138 server_conn = tserver_conn()
139 server_conn.transport_protocol = "udp"
140 if req is None:
141 req = tdnsreq()
142
143 if resp is True:
144 resp = tdnsresp()
145 if err is True:
146 err = terr()
147
148 assert resp is False or isinstance(resp, dns.DNSMessage)
149 assert err is False or isinstance(err, flow.Error)
150
151 f = dns.DNSFlow(client_conn, server_conn)
152 f.timestamp_created = req.timestamp or time.time()
153 f.request = req
154 f.response = resp or None
155 f.error = err or None
156 f.live = live
157 return f
158
159
160def tflow(

Callers 1

tflowsFunction · 0.85

Calls 6

tdnsreqFunction · 0.90
tdnsrespFunction · 0.90
tclient_connFunction · 0.85
tserver_connFunction · 0.85
terrFunction · 0.85
parseMethod · 0.45

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…