RecvAck processes an ACK from the client must be connected, and streamid must match
(ackPk wshrpc.CommandStreamAckData)
| 190 | // RecvAck processes an ACK from the client |
| 191 | // must be connected, and streamid must match |
| 192 | func (sm *StreamManager) RecvAck(ackPk wshrpc.CommandStreamAckData) { |
| 193 | sm.lock.Lock() |
| 194 | defer sm.lock.Unlock() |
| 195 | |
| 196 | if !sm.connected || ackPk.Id != sm.streamId { |
| 197 | return |
| 198 | } |
| 199 | |
| 200 | if ackPk.Fin { |
| 201 | sm.terminalEventAcked = true |
| 202 | sm.drainCond.Signal() |
| 203 | return |
| 204 | } |
| 205 | |
| 206 | seq := ackPk.Seq |
| 207 | rwnd := ackPk.RWnd |
| 208 | |
| 209 | // Ignore stale ACKs using tuple comparison (seq, rwnd) |
| 210 | if seq < sm.maxAckedSeq || (seq == sm.maxAckedSeq && rwnd <= sm.maxAckedRwnd) { |
| 211 | // log.Printf("streammanager ignoring stale ACK: seq=%d rwnd=%d (max: seq=%d rwnd=%d)", |
| 212 | // seq, rwnd, sm.maxAckedSeq, sm.maxAckedRwnd) |
| 213 | return |
| 214 | } |
| 215 | |
| 216 | // Update max acked tuple |
| 217 | sm.maxAckedSeq = seq |
| 218 | sm.maxAckedRwnd = rwnd |
| 219 | |
| 220 | headPos := sm.buf.HeadPos() |
| 221 | if seq < headPos { |
| 222 | return |
| 223 | } |
| 224 | |
| 225 | ackedBytes := seq - headPos |
| 226 | if ackedBytes > sm.sentNotAcked { |
| 227 | return |
| 228 | } |
| 229 | |
| 230 | if ackedBytes > 0 { |
| 231 | if err := sm.buf.Consume(int(ackedBytes)); err != nil { |
| 232 | return |
| 233 | } |
| 234 | sm.sentNotAcked -= ackedBytes |
| 235 | } |
| 236 | |
| 237 | prevRwnd := sm.rwndSize |
| 238 | sm.rwndSize = int(ackPk.RWnd) |
| 239 | effectiveWindow := sm.cwndSize |
| 240 | if sm.rwndSize < effectiveWindow { |
| 241 | effectiveWindow = sm.rwndSize |
| 242 | } |
| 243 | sm.buf.SetEffectiveWindow(true, effectiveWindow) |
| 244 | |
| 245 | if sm.rwndSize > prevRwnd || ackedBytes > 0 { |
| 246 | sm.drainCond.Signal() |
| 247 | } |
| 248 | } |
| 249 |
nothing calls this directly
no test coverage detected