(l net.Listener, store *raftwal.DiskStorage)
| 137 | } |
| 138 | |
| 139 | func (st *state) serveGRPC(l net.Listener, store *raftwal.DiskStorage) { |
| 140 | x.RegisterExporters(Zero.Conf, "dgraph.zero") |
| 141 | grpcOpts := []grpc.ServerOption{ |
| 142 | grpc.MaxRecvMsgSize(x.GrpcMaxSize), |
| 143 | grpc.MaxSendMsgSize(x.GrpcMaxSize), |
| 144 | grpc.MaxConcurrentStreams(1000), |
| 145 | grpc.StatsHandler(otelgrpc.NewServerHandler()), |
| 146 | grpc.UnaryInterceptor(audit.AuditRequestGRPC), |
| 147 | } |
| 148 | |
| 149 | tlsConf, err := x.LoadServerTLSConfigForInternalPort(Zero.Conf) |
| 150 | x.Check(err) |
| 151 | if tlsConf != nil { |
| 152 | grpcOpts = append(grpcOpts, grpc.Creds(credentials.NewTLS(tlsConf))) |
| 153 | } |
| 154 | s := grpc.NewServer(grpcOpts...) |
| 155 | |
| 156 | nodeId := opts.raft.GetUint64("idx") |
| 157 | rc := pb.RaftContext{ |
| 158 | Id: nodeId, |
| 159 | Addr: x.WorkerConfig.MyAddr, |
| 160 | Group: 0, |
| 161 | IsLearner: opts.raft.GetBool("learner"), |
| 162 | } |
| 163 | m := conn.NewNode(&rc, store, opts.tlsClientConfig) |
| 164 | |
| 165 | // Zero followers should not be forwarding proposals to the leader, to avoid txn commits which |
| 166 | // were calculated in a previous Zero leader. |
| 167 | m.Cfg.DisableProposalForwarding = true |
| 168 | st.rs = conn.NewRaftServer(m) |
| 169 | |
| 170 | st.node = &node{Node: m, ctx: context.Background(), closer: z.NewCloser(1)} |
| 171 | st.zero = &Server{NumReplicas: opts.numReplicas, Node: st.node, tlsClientConfig: opts.tlsClientConfig} |
| 172 | st.zero.Init() |
| 173 | st.node.server = st.zero |
| 174 | |
| 175 | pb.RegisterZeroServer(s, st.zero) |
| 176 | pb.RegisterRaftServer(s, st.rs) |
| 177 | |
| 178 | go func() { |
| 179 | defer st.zero.closer.Done() |
| 180 | err := s.Serve(l) |
| 181 | glog.Infof("gRPC server stopped : %v", err) |
| 182 | |
| 183 | // Attempt graceful stop (waits for pending RPCs), but force a stop if |
| 184 | // it doesn't happen in a reasonable amount of time. |
| 185 | done := make(chan struct{}) |
| 186 | const timeout = 5 * time.Second |
| 187 | go func() { |
| 188 | s.GracefulStop() |
| 189 | close(done) |
| 190 | }() |
| 191 | select { |
| 192 | case <-done: |
| 193 | case <-time.After(timeout): |
| 194 | glog.Infof("Stopping grpc gracefully is taking longer than %v."+ |
| 195 | " Force stopping now. Pending RPCs will be abandoned.", timeout) |
| 196 | s.Stop() |
no test coverage detected