Create a DNS flow for testing.
(
*,
client_conn: connection.Client | None = None,
server_conn: connection.Server | None = None,
req: dns.DNSMessage | None = None,
resp: bool | dns.DNSMessage = False,
err: bool | flow.Error = False,
live: bool = True,
)
| 121 | |
| 122 | |
| 123 | def tdnsflow( |
| 124 | *, |
| 125 | client_conn: connection.Client | None = None, |
| 126 | server_conn: connection.Server | None = None, |
| 127 | req: dns.DNSMessage | None = None, |
| 128 | resp: bool | dns.DNSMessage = False, |
| 129 | err: bool | flow.Error = False, |
| 130 | live: bool = True, |
| 131 | ) -> dns.DNSFlow: |
| 132 | """Create a DNS flow for testing.""" |
| 133 | if client_conn is None: |
| 134 | client_conn = tclient_conn() |
| 135 | client_conn.proxy_mode = ProxyMode.parse("dns") |
| 136 | client_conn.transport_protocol = "udp" |
| 137 | if server_conn is None: |
| 138 | server_conn = tserver_conn() |
| 139 | server_conn.transport_protocol = "udp" |
| 140 | if req is None: |
| 141 | req = tdnsreq() |
| 142 | |
| 143 | if resp is True: |
| 144 | resp = tdnsresp() |
| 145 | if err is True: |
| 146 | err = terr() |
| 147 | |
| 148 | assert resp is False or isinstance(resp, dns.DNSMessage) |
| 149 | assert err is False or isinstance(err, flow.Error) |
| 150 | |
| 151 | f = dns.DNSFlow(client_conn, server_conn) |
| 152 | f.timestamp_created = req.timestamp or time.time() |
| 153 | f.request = req |
| 154 | f.response = resp or None |
| 155 | f.error = err or None |
| 156 | f.live = live |
| 157 | return f |
| 158 | |
| 159 | |
| 160 | def tflow( |
no test coverage detected
searching dependent graphs…