NewDataNode creates a new data node
( hostID string, topo topology.Topology, enumReader mutatorsCom.EnumReader, opts Options)
| 100 | |
| 101 | // NewDataNode creates a new data node |
| 102 | func NewDataNode( |
| 103 | hostID string, |
| 104 | topo topology.Topology, |
| 105 | enumReader mutatorsCom.EnumReader, |
| 106 | opts Options) (DataNode, error) { |
| 107 | |
| 108 | iOpts := opts.InstrumentOptions() |
| 109 | logger := iOpts.Logger().With("datanode", hostID) |
| 110 | |
| 111 | scope := iOpts.MetricsScope().SubScope("namespace"). |
| 112 | Tagged(map[string]string{ |
| 113 | "datanode": hostID, |
| 114 | }) |
| 115 | |
| 116 | metaStore, err := metastore.NewDiskMetaStore(filepath.Join(opts.ServerConfig().RootPath, "metastore")) |
| 117 | if err != nil { |
| 118 | return nil, utils.StackError(err, "failed to initialize local metastore") |
| 119 | } |
| 120 | diskStore := diskstore.NewLocalDiskStore(opts.ServerConfig().RootPath) |
| 121 | |
| 122 | bootstrapServer := bootstrap.NewPeerDataNodeServer(metaStore, diskStore) |
| 123 | bootstrapToken := bootstrapServer.(memCom.BootStrapToken) |
| 124 | |
| 125 | redologCfg := opts.ServerConfig().RedoLogConfig |
| 126 | redoLogManagerMaster, err := redolog.NewRedoLogManagerMaster(opts.ServerConfig().Cluster.Namespace, &redologCfg, diskStore, metaStore) |
| 127 | if err != nil { |
| 128 | return nil, utils.StackError(err, "failed to initialize redolog manager master") |
| 129 | } |
| 130 | |
| 131 | numShards := len(topo.Get().ShardSet().AllIDs()) |
| 132 | memStore := memstore.NewMemStore(metaStore, diskStore, |
| 133 | memstore.NewOptions(bootstrapToken, redoLogManagerMaster, memstore.WithNumShards(numShards))) |
| 134 | |
| 135 | grpcServer := grpc.NewServer() |
| 136 | rpc.RegisterPeerDataNodeServer(grpcServer, bootstrapServer) |
| 137 | reflection.Register(grpcServer) |
| 138 | |
| 139 | d := &dataNode{ |
| 140 | hostID: hostID, |
| 141 | topo: topo, |
| 142 | enumReader: enumReader, |
| 143 | metaStore: metaStore, |
| 144 | memStore: memStore, |
| 145 | diskStore: diskStore, |
| 146 | opts: opts, |
| 147 | logger: logger, |
| 148 | metrics: newDatanodeMetrics(scope), |
| 149 | grpcServer: grpcServer, |
| 150 | redoLogManagerMaster: redoLogManagerMaster, |
| 151 | shardSet: shard.NewShardSet(nil), |
| 152 | close: make(chan struct{}), |
| 153 | readyCh: make(chan struct{}), |
| 154 | } |
| 155 | d.handlers = d.newHandlers() |
| 156 | d.bootstrapManager = NewBootstrapManager(d.hostID, memStore, opts.BootstrapOptions(), topo) |
| 157 | clusterClient, err := d.opts.ServerConfig().Cluster.Etcd.NewClient(instrument.NewOptions()) |
| 158 | if err != nil { |
| 159 | return nil, utils.StackError(err, "failed to create etcd client") |
no test coverage detected