MCPcopy
hub / github.com/wavetermdev/waveterm / RecvAck

Method RecvAck

pkg/jobmanager/streammanager.go:192–248  ·  view source on GitHub ↗

RecvAck processes an ACK from the client must be connected, and streamid must match

(ackPk wshrpc.CommandStreamAckData)

Source from the content-addressed store, hash-verified

190// RecvAck processes an ACK from the client
191// must be connected, and streamid must match
192func (sm *StreamManager) RecvAck(ackPk wshrpc.CommandStreamAckData) {
193 sm.lock.Lock()
194 defer sm.lock.Unlock()
195
196 if !sm.connected || ackPk.Id != sm.streamId {
197 return
198 }
199
200 if ackPk.Fin {
201 sm.terminalEventAcked = true
202 sm.drainCond.Signal()
203 return
204 }
205
206 seq := ackPk.Seq
207 rwnd := ackPk.RWnd
208
209 // Ignore stale ACKs using tuple comparison (seq, rwnd)
210 if seq < sm.maxAckedSeq || (seq == sm.maxAckedSeq && rwnd <= sm.maxAckedRwnd) {
211 // log.Printf("streammanager ignoring stale ACK: seq=%d rwnd=%d (max: seq=%d rwnd=%d)",
212 // seq, rwnd, sm.maxAckedSeq, sm.maxAckedRwnd)
213 return
214 }
215
216 // Update max acked tuple
217 sm.maxAckedSeq = seq
218 sm.maxAckedRwnd = rwnd
219
220 headPos := sm.buf.HeadPos()
221 if seq < headPos {
222 return
223 }
224
225 ackedBytes := seq - headPos
226 if ackedBytes > sm.sentNotAcked {
227 return
228 }
229
230 if ackedBytes > 0 {
231 if err := sm.buf.Consume(int(ackedBytes)); err != nil {
232 return
233 }
234 sm.sentNotAcked -= ackedBytes
235 }
236
237 prevRwnd := sm.rwndSize
238 sm.rwndSize = int(ackPk.RWnd)
239 effectiveWindow := sm.cwndSize
240 if sm.rwndSize < effectiveWindow {
241 effectiveWindow = sm.rwndSize
242 }
243 sm.buf.SetEffectiveWindow(true, effectiveWindow)
244
245 if sm.rwndSize > prevRwnd || ackedBytes > 0 {
246 sm.drainCond.Signal()
247 }
248}
249

Callers

nothing calls this directly

Calls 3

HeadPosMethod · 0.80
ConsumeMethod · 0.80
SetEffectiveWindowMethod · 0.80

Tested by

no test coverage detected