(ackPk wshrpc.CommandStreamAckData)
| 146 | } |
| 147 | |
| 148 | func (b *Broker) processSendAck(ackPk wshrpc.CommandStreamAckData) { |
| 149 | b.lock.Lock() |
| 150 | route, ok := b.writerRoutes[ackPk.Id] |
| 151 | b.lock.Unlock() |
| 152 | if !ok { |
| 153 | return |
| 154 | } |
| 155 | |
| 156 | opts := &wshrpc.RpcOpts{ |
| 157 | Route: route, |
| 158 | NoResponse: true, |
| 159 | } |
| 160 | b.rpcClient.StreamDataAckCommand(ackPk, opts) |
| 161 | |
| 162 | if ackPk.Fin || ackPk.Cancel { |
| 163 | b.cleanupReader(ackPk.Id) |
| 164 | } |
| 165 | } |
| 166 | |
| 167 | func (b *Broker) processSendData(dataPk wshrpc.CommandStreamData) { |
| 168 | b.lock.Lock() |
no test coverage detected