| 825 | |
| 826 | |
| 827 | class UDP(Packet): |
| 828 | name = "UDP" |
| 829 | fields_desc = [ShortEnumField("sport", 53, UDP_SERVICES), |
| 830 | ShortEnumField("dport", 53, UDP_SERVICES), |
| 831 | ShortField("len", None), |
| 832 | XShortField("chksum", None), ] |
| 833 | |
| 834 | def post_build(self, p, pay): |
| 835 | p += pay |
| 836 | tmp_len = self.len |
| 837 | if tmp_len is None: |
| 838 | tmp_len = len(p) |
| 839 | p = p[:4] + struct.pack("!H", tmp_len) + p[6:] |
| 840 | if self.chksum is None: |
| 841 | if isinstance(self.underlayer, IP): |
| 842 | ck = in4_chksum(socket.IPPROTO_UDP, self.underlayer, p) |
| 843 | # According to RFC768 if the result checksum is 0, it should be set to 0xFFFF # noqa: E501 |
| 844 | if ck == 0: |
| 845 | ck = 0xFFFF |
| 846 | p = p[:6] + struct.pack("!H", ck) + p[8:] |
| 847 | elif isinstance(self.underlayer, scapy.layers.inet6.IPv6) or isinstance(self.underlayer, scapy.layers.inet6._IPv6ExtHdr): # noqa: E501 |
| 848 | ck = scapy.layers.inet6.in6_chksum(socket.IPPROTO_UDP, self.underlayer, p) # noqa: E501 |
| 849 | # According to RFC2460 if the result checksum is 0, it should be set to 0xFFFF # noqa: E501 |
| 850 | if ck == 0: |
| 851 | ck = 0xFFFF |
| 852 | p = p[:6] + struct.pack("!H", ck) + p[8:] |
| 853 | else: |
| 854 | log_runtime.info( |
| 855 | "No IP underlayer to compute checksum. Leaving null." |
| 856 | ) |
| 857 | return p |
| 858 | |
| 859 | def extract_padding(self, s): |
| 860 | tmp_len = self.len - 8 |
| 861 | return s[:tmp_len], s[tmp_len:] |
| 862 | |
| 863 | def hashret(self): |
| 864 | return self.payload.hashret() |
| 865 | |
| 866 | def answers(self, other): |
| 867 | if not isinstance(other, UDP): |
| 868 | return 0 |
| 869 | if conf.checkIPsrc: |
| 870 | if self.dport != other.sport: |
| 871 | return 0 |
| 872 | return self.payload.answers(other.payload) |
| 873 | |
| 874 | def mysummary(self): |
| 875 | if isinstance(self.underlayer, IP): |
| 876 | return self.underlayer.sprintf("UDP %IP.src%:%UDP.sport% > %IP.dst%:%UDP.dport%") # noqa: E501 |
| 877 | elif isinstance(self.underlayer, scapy.layers.inet6.IPv6): |
| 878 | return self.underlayer.sprintf("UDP %IPv6.src%:%UDP.sport% > %IPv6.dst%:%UDP.dport%") # noqa: E501 |
| 879 | else: |
| 880 | return self.sprintf("UDP %UDP.sport% > %UDP.dport%") |
| 881 | |
| 882 | |
| 883 | # RFC 4884 ICMP extensions |
no test coverage detected