MCPcopy
hub / github.com/dgraph-io/dgraph / serveGRPC

Method serveGRPC

dgraph/cmd/zero/run.go:139–199  ·  view source on GitHub ↗
(l net.Listener, store *raftwal.DiskStorage)

Source from the content-addressed store, hash-verified

137}
138
139func (st *state) serveGRPC(l net.Listener, store *raftwal.DiskStorage) {
140 x.RegisterExporters(Zero.Conf, "dgraph.zero")
141 grpcOpts := []grpc.ServerOption{
142 grpc.MaxRecvMsgSize(x.GrpcMaxSize),
143 grpc.MaxSendMsgSize(x.GrpcMaxSize),
144 grpc.MaxConcurrentStreams(1000),
145 grpc.StatsHandler(otelgrpc.NewServerHandler()),
146 grpc.UnaryInterceptor(audit.AuditRequestGRPC),
147 }
148
149 tlsConf, err := x.LoadServerTLSConfigForInternalPort(Zero.Conf)
150 x.Check(err)
151 if tlsConf != nil {
152 grpcOpts = append(grpcOpts, grpc.Creds(credentials.NewTLS(tlsConf)))
153 }
154 s := grpc.NewServer(grpcOpts...)
155
156 nodeId := opts.raft.GetUint64("idx")
157 rc := pb.RaftContext{
158 Id: nodeId,
159 Addr: x.WorkerConfig.MyAddr,
160 Group: 0,
161 IsLearner: opts.raft.GetBool("learner"),
162 }
163 m := conn.NewNode(&rc, store, opts.tlsClientConfig)
164
165 // Zero followers should not be forwarding proposals to the leader, to avoid txn commits which
166 // were calculated in a previous Zero leader.
167 m.Cfg.DisableProposalForwarding = true
168 st.rs = conn.NewRaftServer(m)
169
170 st.node = &node{Node: m, ctx: context.Background(), closer: z.NewCloser(1)}
171 st.zero = &Server{NumReplicas: opts.numReplicas, Node: st.node, tlsClientConfig: opts.tlsClientConfig}
172 st.zero.Init()
173 st.node.server = st.zero
174
175 pb.RegisterZeroServer(s, st.zero)
176 pb.RegisterRaftServer(s, st.rs)
177
178 go func() {
179 defer st.zero.closer.Done()
180 err := s.Serve(l)
181 glog.Infof("gRPC server stopped : %v", err)
182
183 // Attempt graceful stop (waits for pending RPCs), but force a stop if
184 // it doesn't happen in a reasonable amount of time.
185 done := make(chan struct{})
186 const timeout = 5 * time.Second
187 go func() {
188 s.GracefulStop()
189 close(done)
190 }()
191 select {
192 case <-done:
193 case <-time.After(timeout):
194 glog.Infof("Stopping grpc gracefully is taking longer than %v."+
195 " Force stopping now. Pending RPCs will be abandoned.", timeout)
196 s.Stop()

Callers 1

runFunction · 0.95

Calls 11

RegisterExportersFunction · 0.92
CheckFunction · 0.92
NewNodeFunction · 0.92
NewRaftServerFunction · 0.92
RegisterZeroServerFunction · 0.92
RegisterRaftServerFunction · 0.92
InfofMethod · 0.80
StopMethod · 0.65
InitMethod · 0.45
DoneMethod · 0.45

Tested by

no test coverage detected