MCPcopy
hub / github.com/wavetermdev/waveterm / RecvAck

Method RecvAck

pkg/streamclient/streamwriter.go:49–97  ·  view source on GitHub ↗
(ackPk wshrpc.CommandStreamAckData)

Source from the content-addressed store, hash-verified

47}
48
49func (w *Writer) RecvAck(ackPk wshrpc.CommandStreamAckData) {
50 w.lock.Lock()
51 defer w.lock.Unlock()
52
53 if ackPk.Id != w.id {
54 return
55 }
56
57 ackedSeq := ackPk.Seq
58 rwnd := ackPk.RWnd
59
60 if ackPk.Fin {
61 w.finAcked = true
62 w.maxAckedSeq = ackedSeq
63 return
64 }
65
66 if ackPk.Cancel && !w.canceled {
67 w.canceled = true
68 close(w.canceledChan)
69 if !w.closed {
70 w.err = fmt.Errorf("stream cancelled")
71 w.cond.Broadcast()
72 }
73 return
74 }
75
76 // Ignore stale ACKs using tuple comparison (seq, rwnd)
77 if ackedSeq < w.maxAckedSeq || (ackedSeq == w.maxAckedSeq && rwnd <= w.maxAckedRwnd) {
78 return
79 }
80
81 // Update max acked tuple
82 w.maxAckedSeq = ackedSeq
83 w.maxAckedRwnd = rwnd
84
85 if !w.closed {
86 if ackedSeq > (w.nextSeq - w.sentNotAcked) {
87 ackedBytes := ackedSeq - (w.nextSeq - w.sentNotAcked)
88 w.sentNotAcked -= ackedBytes
89 if w.sentNotAcked < 0 {
90 w.sentNotAcked = 0
91 }
92 }
93
94 w.readWindow = rwnd
95 w.cond.Broadcast()
96 }
97}
98
99func (w *Writer) GetAckState() (maxAckedSeq int64, finAcked bool, canceled bool) {
100 w.lock.Lock()

Callers 6

TestBasicReadWriteFunction · 0.95
TestEOFFunction · 0.95
TestFlowControlFunction · 0.95
TestErrorFunction · 0.95
TestCancelFunction · 0.95
TestMultipleWritesFunction · 0.95

Calls

no outgoing calls

Tested by 6

TestBasicReadWriteFunction · 0.76
TestEOFFunction · 0.76
TestFlowControlFunction · 0.76
TestErrorFunction · 0.76
TestCancelFunction · 0.76
TestMultipleWritesFunction · 0.76