(dataPk wshrpc.CommandStreamData)
| 177 | } |
| 178 | |
| 179 | func (b *Broker) processRecvData(dataPk wshrpc.CommandStreamData) { |
| 180 | b.lock.Lock() |
| 181 | reader, ok := b.readers[dataPk.Id] |
| 182 | if !ok { |
| 183 | lastSent := b.readerErrorSentTime[dataPk.Id] |
| 184 | now := time.Now() |
| 185 | if now.Sub(lastSent) < time.Second { |
| 186 | b.lock.Unlock() |
| 187 | return |
| 188 | } |
| 189 | b.readerErrorSentTime[dataPk.Id] = now |
| 190 | } |
| 191 | b.lock.Unlock() |
| 192 | |
| 193 | if !ok { |
| 194 | ackPk := wshrpc.CommandStreamAckData{ |
| 195 | Id: dataPk.Id, |
| 196 | Seq: dataPk.Seq, |
| 197 | Cancel: true, |
| 198 | Error: "stream reader not found", |
| 199 | } |
| 200 | b.SendAck(ackPk) |
| 201 | return |
| 202 | } |
| 203 | |
| 204 | reader.RecvData(dataPk) |
| 205 | } |
| 206 | |
| 207 | func (b *Broker) processRecvAck(ackPk wshrpc.CommandStreamAckData) { |
| 208 | b.lock.Lock() |
no test coverage detected