(readFromSocket, otherSocket batching.Conn, readFromSocketIsIPv4 bool)
| 863 | } |
| 864 | |
| 865 | func (s *Server) packetReadLoop(readFromSocket, otherSocket batching.Conn, readFromSocketIsIPv4 bool) { |
| 866 | defer func() { |
| 867 | // We intentionally close the [Server] if we encounter a socket read |
| 868 | // error below, at least until socket "re-binding" is implemented as |
| 869 | // part of http://go/corp/30118. |
| 870 | // |
| 871 | // Decrementing this [sync.WaitGroup] _before_ calling [Server.Close] is |
| 872 | // intentional as [Server.Close] waits on it. |
| 873 | s.wg.Done() |
| 874 | s.Close() |
| 875 | }() |
| 876 | |
| 877 | msgs := make([]ipv6.Message, batching.IdealBatchSize) |
| 878 | for i := range msgs { |
| 879 | msgs[i].OOB = make([]byte, batching.MinControlMessageSize()) |
| 880 | msgs[i].Buffers = make([][]byte, 1) |
| 881 | msgs[i].Buffers[0] = make([]byte, 1<<16-1) |
| 882 | } |
| 883 | writeBuffsByDest := make(map[netip.AddrPort][][]byte, batching.IdealBatchSize) |
| 884 | |
| 885 | for { |
| 886 | for i := range msgs { |
| 887 | msgs[i] = ipv6.Message{Buffers: msgs[i].Buffers, OOB: msgs[i].OOB[:cap(msgs[i].OOB)]} |
| 888 | } |
| 889 | |
| 890 | // TODO: extract laddr from IP_PKTINFO for use in reply |
| 891 | // ReadBatch will split coalesced datagrams before returning, which |
| 892 | // WriteBatchTo will re-coalesce further down. We _could_ be more |
| 893 | // efficient and not split datagrams that belong to the same VNI if they |
| 894 | // are non-control/handshake packets. We pay the memmove/memcopy |
| 895 | // performance penalty for now in the interest of simple single packet |
| 896 | // handlers. |
| 897 | n, err := readFromSocket.ReadBatch(msgs, 0) |
| 898 | if err != nil { |
| 899 | s.logf("error reading from socket(%v): %v", readFromSocket.LocalAddr(), err) |
| 900 | return |
| 901 | } |
| 902 | |
| 903 | // Aggregate counts for the packet batch before writing metrics. |
| 904 | forwardedByOutAF := struct { |
| 905 | bytes4 int64 |
| 906 | packets4 int64 |
| 907 | bytes6 int64 |
| 908 | packets6 int64 |
| 909 | }{} |
| 910 | for _, msg := range msgs[:n] { |
| 911 | if msg.N == 0 { |
| 912 | continue |
| 913 | } |
| 914 | buf := msg.Buffers[0][:msg.N] |
| 915 | from := msg.Addr.(*net.UDPAddr).AddrPort() |
| 916 | write, to, isDataPacket := s.handlePacket(from, buf) |
| 917 | if !to.IsValid() { |
| 918 | continue |
| 919 | } |
| 920 | if isDataPacket { |
| 921 | if to.Addr().Is4() { |
| 922 | forwardedByOutAF.bytes4 += int64(len(write)) |
no test coverage detected