MCPcopy
hub / github.com/wavetermdev/waveterm / ClientConnected

Method ClientConnected

pkg/jobmanager/streammanager.go:97–146  ·  view source on GitHub ↗

ClientConnected transitions to CONNECTED mode

(streamId string, dataSender DataSender, rwndSize int, clientSeq int64)

Source from the content-addressed store, hash-verified

95
96// ClientConnected transitions to CONNECTED mode
97func (sm *StreamManager) ClientConnected(streamId string, dataSender DataSender, rwndSize int, clientSeq int64) (int64, error) {
98 sm.lock.Lock()
99 defer sm.lock.Unlock()
100
101 if sm.closed || sm.terminalEventAcked {
102 return 0, fmt.Errorf("stream is closed")
103 }
104
105 if sm.connected {
106 return 0, fmt.Errorf("client already connected")
107 }
108
109 if dataSender == nil {
110 return 0, fmt.Errorf("dataSender cannot be nil")
111 }
112
113 headPos := sm.buf.HeadPos()
114 if clientSeq > headPos {
115 bytesToConsume := int(clientSeq - headPos)
116 available := sm.buf.Size()
117 if bytesToConsume > available {
118 return 0, fmt.Errorf("client seq %d is beyond our stream end (head=%d, size=%d)", clientSeq, headPos, available)
119 }
120 if bytesToConsume > 0 {
121 if err := sm.buf.Consume(bytesToConsume); err != nil {
122 return 0, fmt.Errorf("failed to consume buffer: %w", err)
123 }
124 headPos = sm.buf.HeadPos()
125 }
126 }
127
128 sm.streamId = streamId
129 sm.dataSender = dataSender
130 sm.connected = true
131 sm.rwndSize = rwndSize
132 sm.sentNotAcked = 0
133 effectiveWindow := sm.cwndSize
134 if sm.rwndSize < effectiveWindow {
135 effectiveWindow = sm.rwndSize
136 }
137 sm.buf.SetEffectiveWindow(true, effectiveWindow)
138 sm.drainCond.Signal()
139
140 startSeq := headPos
141 if clientSeq > startSeq {
142 startSeq = clientSeq
143 }
144
145 return startSeq, nil
146}
147
148// GetStreamId returns the current stream ID (safe to call with lock held by caller)
149func (sm *StreamManager) GetStreamId() string {

Calls 4

HeadPosMethod · 0.80
ConsumeMethod · 0.80
SetEffectiveWindowMethod · 0.80
SizeMethod · 0.45