| 98 | } |
| 99 | |
| 100 | func (proxy *UDPProxy) replyLoop(cte *connTrackEntry, serverAddr net.IP, clientAddr *net.UDPAddr, clientKey *connTrackKey) { |
| 101 | defer func() { |
| 102 | proxy.connTrackLock.Lock() |
| 103 | delete(proxy.connTrackTable, *clientKey) |
| 104 | cte.mu.Lock() |
| 105 | proxy.connTrackLock.Unlock() |
| 106 | cte.conn.Close() |
| 107 | }() |
| 108 | |
| 109 | var oob []byte |
| 110 | if proxy.ipVer == ip4 { |
| 111 | cm := &ipv4.ControlMessage{Src: serverAddr} |
| 112 | oob = cm.Marshal() |
| 113 | } else { |
| 114 | cm := &ipv6.ControlMessage{Src: serverAddr} |
| 115 | oob = cm.Marshal() |
| 116 | } |
| 117 | |
| 118 | readBuf := make([]byte, UDPBufSize) |
| 119 | for { |
| 120 | cte.conn.SetReadDeadline(time.Now().Add(proxy.connTrackTimeout)) |
| 121 | again: |
| 122 | read, err := cte.conn.Read(readBuf) |
| 123 | if err != nil { |
| 124 | if errors.Is(err, syscall.ECONNREFUSED) { |
| 125 | // This will happen if the last write failed |
| 126 | // (e.g: nothing is actually listening on the |
| 127 | // proxied port on the container), ignore it |
| 128 | // and continue until DefaultConnTrackTimeout |
| 129 | // expires: |
| 130 | goto again |
| 131 | } |
| 132 | // If the UDP connection is one-sided (i.e. the backend never sends |
| 133 | // replies), the connTrackEntry should not be GC'd until no writes |
| 134 | // happen for proxy.connTrackTimeout. |
| 135 | // |
| 136 | // Since the ReadDeadline is set to proxy.connTrackTimeout, in such |
| 137 | // case, the connTrackEntry will be GC'd at most after 2 * proxy.connTrackTimeout. |
| 138 | if errors.Is(err, os.ErrDeadlineExceeded) && time.Since(cte.lastWrite()) < proxy.connTrackTimeout { |
| 139 | continue |
| 140 | } |
| 141 | return |
| 142 | } |
| 143 | for i := 0; i != read; { |
| 144 | written, _, err := proxy.listener.WriteMsgUDP(readBuf[i:read], oob, clientAddr) |
| 145 | if err != nil { |
| 146 | return |
| 147 | } |
| 148 | i += written |
| 149 | } |
| 150 | } |
| 151 | } |
| 152 | |
| 153 | // Run starts forwarding the traffic using UDP. |
| 154 | func (proxy *UDPProxy) Run() { |