ClientConnected transitions to CONNECTED mode
(streamId string, dataSender DataSender, rwndSize int, clientSeq int64)
| 95 | |
| 96 | // ClientConnected transitions to CONNECTED mode |
| 97 | func (sm *StreamManager) ClientConnected(streamId string, dataSender DataSender, rwndSize int, clientSeq int64) (int64, error) { |
| 98 | sm.lock.Lock() |
| 99 | defer sm.lock.Unlock() |
| 100 | |
| 101 | if sm.closed || sm.terminalEventAcked { |
| 102 | return 0, fmt.Errorf("stream is closed") |
| 103 | } |
| 104 | |
| 105 | if sm.connected { |
| 106 | return 0, fmt.Errorf("client already connected") |
| 107 | } |
| 108 | |
| 109 | if dataSender == nil { |
| 110 | return 0, fmt.Errorf("dataSender cannot be nil") |
| 111 | } |
| 112 | |
| 113 | headPos := sm.buf.HeadPos() |
| 114 | if clientSeq > headPos { |
| 115 | bytesToConsume := int(clientSeq - headPos) |
| 116 | available := sm.buf.Size() |
| 117 | if bytesToConsume > available { |
| 118 | return 0, fmt.Errorf("client seq %d is beyond our stream end (head=%d, size=%d)", clientSeq, headPos, available) |
| 119 | } |
| 120 | if bytesToConsume > 0 { |
| 121 | if err := sm.buf.Consume(bytesToConsume); err != nil { |
| 122 | return 0, fmt.Errorf("failed to consume buffer: %w", err) |
| 123 | } |
| 124 | headPos = sm.buf.HeadPos() |
| 125 | } |
| 126 | } |
| 127 | |
| 128 | sm.streamId = streamId |
| 129 | sm.dataSender = dataSender |
| 130 | sm.connected = true |
| 131 | sm.rwndSize = rwndSize |
| 132 | sm.sentNotAcked = 0 |
| 133 | effectiveWindow := sm.cwndSize |
| 134 | if sm.rwndSize < effectiveWindow { |
| 135 | effectiveWindow = sm.rwndSize |
| 136 | } |
| 137 | sm.buf.SetEffectiveWindow(true, effectiveWindow) |
| 138 | sm.drainCond.Signal() |
| 139 | |
| 140 | startSeq := headPos |
| 141 | if clientSeq > startSeq { |
| 142 | startSeq = clientSeq |
| 143 | } |
| 144 | |
| 145 | return startSeq, nil |
| 146 | } |
| 147 | |
| 148 | // GetStreamId returns the current stream ID (safe to call with lock held by caller) |
| 149 | func (sm *StreamManager) GetStreamId() string { |