(tctx, transport_protocol)
| 208 | |
| 209 | @pytest.mark.parametrize("transport_protocol", ["tcp", "udp"]) |
| 210 | def test_reverse(tctx, transport_protocol): |
| 211 | tctx.client.transport_protocol = transport_protocol |
| 212 | tctx.server.transport_protocol = transport_protocol |
| 213 | |
| 214 | f = Placeholder(DNSFlow) |
| 215 | layer = dns.DNSLayer(tctx) |
| 216 | layer.context.server.address = ("8.8.8.8", 53) |
| 217 | |
| 218 | req = tdnsreq() |
| 219 | resp = tdnsresp() |
| 220 | |
| 221 | assert ( |
| 222 | Playbook(layer) |
| 223 | >> DataReceived( |
| 224 | tctx.client, dns.pack_message(req, tctx.client.transport_protocol) |
| 225 | ) |
| 226 | << dns.DnsRequestHook(f) |
| 227 | >> reply() |
| 228 | << OpenConnection(tctx.server) |
| 229 | >> reply(None) |
| 230 | << SendData(tctx.server, dns.pack_message(req, tctx.server.transport_protocol)) |
| 231 | >> DataReceived( |
| 232 | tctx.server, dns.pack_message(resp, tctx.server.transport_protocol) |
| 233 | ) |
| 234 | << dns.DnsResponseHook(f) |
| 235 | >> reply() |
| 236 | << SendData(tctx.client, dns.pack_message(resp, tctx.client.transport_protocol)) |
| 237 | >> ConnectionClosed(tctx.client) |
| 238 | << CloseConnection(tctx.server) |
| 239 | << None |
| 240 | ) |
| 241 | assert f().request |
| 242 | assert f().response |
| 243 | assert not f().live |
| 244 | req.timestamp = f().request.timestamp |
| 245 | resp.timestamp = f().response.timestamp |
| 246 | assert f().request == req and f().response == resp |
| 247 | |
| 248 | |
| 249 | @pytest.mark.parametrize("transport_protocol", ["tcp", "udp"]) |
nothing calls this directly
no test coverage detected
searching dependent graphs…