KeepAlive is like client/server ping process, to notify health about each other
(stream pb.PeerDataNode_KeepAliveServer)
| 181 | |
| 182 | // KeepAlive is like client/server ping process, to notify health about each other |
| 183 | func (p *PeerDataNodeServerImpl) KeepAlive(stream pb.PeerDataNode_KeepAliveServer) error { |
| 184 | utils.GetLogger().With("action", "bootstrap").Info("keep alive called") |
| 185 | var sessionInfo *sessionInfo |
| 186 | var err error |
| 187 | |
| 188 | defer func() { |
| 189 | if sessionInfo != nil { |
| 190 | if err == nil { |
| 191 | logInfoMsg(sessionInfo, "keep alive stoped") |
| 192 | } else { |
| 193 | logErrorMsg(sessionInfo, err, "keep alive failed") |
| 194 | } |
| 195 | } else { |
| 196 | utils.GetLogger().With("action", "bootstrap", "error", err).Error("keep alive stopped") |
| 197 | } |
| 198 | }() |
| 199 | |
| 200 | for { |
| 201 | var session *pb.Session |
| 202 | session, err = stream.Recv() |
| 203 | if err == io.EOF { |
| 204 | err = nil |
| 205 | break |
| 206 | } |
| 207 | if err != nil { |
| 208 | return err |
| 209 | } |
| 210 | |
| 211 | if sessionInfo == nil { |
| 212 | if err = p.validateSessionSource(session.ID, session.NodeID); err != nil { |
| 213 | return err |
| 214 | } |
| 215 | sessionInfo, _ = p.getSession(session.ID) |
| 216 | } |
| 217 | // update last live time |
| 218 | sessionInfo.lastLiveTime = utils.Now() |
| 219 | |
| 220 | if err = stream.Send(&pb.KeepAliveResponse{ID: session.ID, Ttl: sessionInfo.ttl}); err != nil { |
| 221 | return err |
| 222 | } |
| 223 | } |
| 224 | if sessionInfo != nil { |
| 225 | p.cleanSession(sessionInfo.sessionID, true) |
| 226 | } |
| 227 | return nil |
| 228 | } |
| 229 | |
| 230 | // FetchTableShardMetaData to retrieve all metadata for one table/shard |
| 231 | func (p *PeerDataNodeServerImpl) FetchTableShardMetaData(ctx context.Context, req *pb.TableShardMetaDataRequest) (*pb.TableShardMetaData, error) { |
nothing calls this directly
no test coverage detected