(tctx, transport_protocol)
| 69 | |
| 70 | @pytest.mark.parametrize("transport_protocol", ["tcp", "udp"]) |
| 71 | def test_regular(tctx, transport_protocol): |
| 72 | tctx.client.transport_protocol = transport_protocol |
| 73 | tctx.server.transport_protocol = transport_protocol |
| 74 | |
| 75 | f = Placeholder(DNSFlow) |
| 76 | |
| 77 | req = tdnsreq() |
| 78 | resp = tdnsresp() |
| 79 | |
| 80 | def resolve(flow: DNSFlow): |
| 81 | nonlocal req, resp |
| 82 | assert flow.request |
| 83 | req.timestamp = flow.request.timestamp |
| 84 | assert flow.request == req |
| 85 | resp.timestamp = time.time() |
| 86 | flow.response = resp |
| 87 | |
| 88 | assert ( |
| 89 | Playbook(dns.DNSLayer(tctx)) |
| 90 | >> DataReceived(tctx.client, dns.pack_message(req, transport_protocol)) |
| 91 | << dns.DnsRequestHook(f) |
| 92 | >> reply(side_effect=resolve) |
| 93 | << dns.DnsResponseHook(f) |
| 94 | >> reply() |
| 95 | << SendData(tctx.client, dns.pack_message(resp, transport_protocol)) |
| 96 | >> ConnectionClosed(tctx.client) |
| 97 | << None |
| 98 | ) |
| 99 | assert f().request == req |
| 100 | assert f().response == resp |
| 101 | assert not f().live |
| 102 | |
| 103 | |
| 104 | @pytest.mark.parametrize("transport_protocol", ["tcp", "udp"]) |
nothing calls this directly
no test coverage detected
searching dependent graphs…