Client returns a grpc client that can talk to any Alpha in the cluster
()
| 1011 | |
| 1012 | // Client returns a grpc client that can talk to any Alpha in the cluster |
| 1013 | func (c *LocalCluster) Client() (*dgraphapi.GrpcClient, func(), error) { |
| 1014 | // TODO(aman): can we cache the connections? |
| 1015 | retryPolicy := `{ |
| 1016 | "methodConfig": [{ |
| 1017 | "retryPolicy": { |
| 1018 | "MaxAttempts": 4, |
| 1019 | "InitialBackoff": ".01s", |
| 1020 | "MaxBackoff": ".01s", |
| 1021 | "BackoffMultiplier": 1.0, |
| 1022 | "RetryableStatusCodes": [ "UNAVAILABLE" ] |
| 1023 | } |
| 1024 | }] |
| 1025 | }` |
| 1026 | var apiClients []api.DgraphClient |
| 1027 | var conns []*grpc.ClientConn |
| 1028 | for _, aa := range c.alphas { |
| 1029 | if !aa.isRunning { |
| 1030 | continue |
| 1031 | } |
| 1032 | url, err := aa.alphaURL(c) |
| 1033 | if err != nil { |
| 1034 | return nil, nil, errors.Wrap(err, "error getting health URL") |
| 1035 | } |
| 1036 | conn, err := grpc.NewClient(url, |
| 1037 | grpc.WithTransportCredentials(insecure.NewCredentials()), |
| 1038 | grpc.WithDefaultServiceConfig(retryPolicy)) |
| 1039 | if err != nil { |
| 1040 | return nil, nil, errors.Wrap(err, "error connecting to alpha") |
| 1041 | } |
| 1042 | conns = append(conns, conn) |
| 1043 | apiClients = append(apiClients, api.NewDgraphClient(conn)) |
| 1044 | } |
| 1045 | |
| 1046 | if len(apiClients) == 0 { |
| 1047 | return nil, nil, errors.New("no alphas running") |
| 1048 | } |
| 1049 | client := dgo.NewDgraphClient(apiClients...) |
| 1050 | cleanup := func() { |
| 1051 | for _, conn := range conns { |
| 1052 | if err := conn.Close(); err != nil { |
| 1053 | log.Printf("[WARNING] problem closing connection: %v", err) |
| 1054 | } |
| 1055 | } |
| 1056 | } |
| 1057 | return &dgraphapi.GrpcClient{Dgraph: client}, cleanup, nil |
| 1058 | } |
| 1059 | |
| 1060 | func (c *LocalCluster) AlphaClient(id int) (*dgraphapi.GrpcClient, func(), error) { |
| 1061 | alpha := c.alphas[id] |