Type definitions moved to types.go
(masters string, templateFS http.FileSystem, dataDir string, icebergPort int)
| 135 | // Type definitions moved to types.go |
| 136 | |
| 137 | func NewAdminServer(masters string, templateFS http.FileSystem, dataDir string, icebergPort int) *AdminServer { |
| 138 | grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.admin") |
| 139 | |
| 140 | // Create master client with multiple master support |
| 141 | masterClient := wdclient.NewMasterClient( |
| 142 | grpcDialOption, |
| 143 | "", // filerGroup - not needed for admin |
| 144 | "admin", // clientType |
| 145 | "", // clientHost - not needed for admin |
| 146 | "", // dataCenter - not needed for admin |
| 147 | "", // rack - not needed for admin |
| 148 | *pb.ServerAddresses(masters).ToServiceDiscovery(), |
| 149 | ) |
| 150 | |
| 151 | // Start master client connection process (like shell and filer do) |
| 152 | bgCtx, bgCancel := context.WithCancel(context.Background()) |
| 153 | go masterClient.KeepConnectedToMaster(bgCtx) |
| 154 | |
| 155 | lockManager := NewAdminLockManager(masterClient, adminLockClientName) |
| 156 | presenceLock := newAdminPresenceLock(masterClient) |
| 157 | if presenceLock != nil { |
| 158 | presenceLock.Start() |
| 159 | } |
| 160 | |
| 161 | server := &AdminServer{ |
| 162 | masterClient: masterClient, |
| 163 | templateFS: templateFS, |
| 164 | dataDir: dataDir, |
| 165 | grpcDialOption: grpcDialOption, |
| 166 | cacheExpiration: defaultCacheTimeout, |
| 167 | filerCacheExpiration: defaultFilerCacheTimeout, |
| 168 | configPersistence: NewConfigPersistence(dataDir), |
| 169 | collectionStatsCacheThreshold: defaultStatsCacheTimeout, |
| 170 | s3TablesManager: newS3TablesManager(), |
| 171 | icebergPort: icebergPort, |
| 172 | pluginLock: lockManager, |
| 173 | adminPresenceLock: presenceLock, |
| 174 | bgCancel: bgCancel, |
| 175 | } |
| 176 | |
| 177 | // Initialize topic retention purger |
| 178 | server.topicRetentionPurger = NewTopicRetentionPurger(server) |
| 179 | |
| 180 | // Initialize credential manager with defaults |
| 181 | credentialManager, err := credential.NewCredentialManagerWithDefaults(credential.StoreTypeGrpc) |
| 182 | if err != nil { |
| 183 | glog.Warningf("Failed to initialize credential manager: %v", err) |
| 184 | // Continue without credential manager - will fall back to legacy approach |
| 185 | } else { |
| 186 | server.credentialManager = credentialManager |
| 187 | glog.V(0).Infof("Credential manager initialized with store type: %s", credentialManager.GetStore().GetName()) |
| 188 | |
| 189 | // For stores that need filer address function, configure them |
| 190 | if store := credentialManager.GetStore(); store != nil { |
| 191 | if filerFuncSetter, ok := store.(interface { |
| 192 | SetFilerAddressFunc(func() pb.ServerAddress, grpc.DialOption) |
| 193 | }); ok { |
| 194 | // Configure the filer address function to dynamically return the current active filer |
no test coverage detected