(
self, flow: dns.DNSFlow, msg: dns.DNSMessage
)
| 69 | self.resp_buf = bytearray() |
| 70 | |
| 71 | def handle_request( |
| 72 | self, flow: dns.DNSFlow, msg: dns.DNSMessage |
| 73 | ) -> layer.CommandGenerator[None]: |
| 74 | flow.request = msg # if already set, continue and query upstream again |
| 75 | yield DnsRequestHook(flow) |
| 76 | if flow.response: |
| 77 | yield from self.handle_response(flow, flow.response) |
| 78 | elif flow.error: |
| 79 | yield from self.handle_error(flow, flow.error.msg) |
| 80 | elif not self.context.server.address: |
| 81 | yield from self.handle_error( |
| 82 | flow, "No hook has set a response and there is no upstream server." |
| 83 | ) |
| 84 | else: |
| 85 | if not self.context.server.connected: |
| 86 | err = yield commands.OpenConnection(self.context.server) |
| 87 | if err: |
| 88 | yield from self.handle_error(flow, str(err)) |
| 89 | # cannot recover from this |
| 90 | return |
| 91 | packed = pack_message(flow.request, flow.server_conn.transport_protocol) |
| 92 | yield commands.SendData(self.context.server, packed) |
| 93 | |
| 94 | def handle_response( |
| 95 | self, flow: dns.DNSFlow, msg: dns.DNSMessage |
no test coverage detected