initConfigController creates the config controller in the pilotConfig.
(args *PilotArgs)
| 77 | |
| 78 | // initConfigController creates the config controller in the pilotConfig. |
| 79 | func (s *Server) initConfigController(args *PilotArgs) error { |
| 80 | meshConfig := s.environment.Mesh() |
| 81 | if len(meshConfig.ConfigSources) > 0 { |
| 82 | // Using MCP for config. |
| 83 | if err := s.initConfigSources(args); err != nil { |
| 84 | return err |
| 85 | } |
| 86 | } else if args.RegistryOptions.FileDir != "" { |
| 87 | // Local files - should be added even if other options are specified |
| 88 | configController, err := file.NewController( |
| 89 | args.RegistryOptions.FileDir, |
| 90 | args.RegistryOptions.KubeOptions.DomainSuffix, |
| 91 | collections.Pilot, |
| 92 | args.RegistryOptions.KubeOptions, |
| 93 | ) |
| 94 | if err != nil { |
| 95 | return err |
| 96 | } |
| 97 | s.ConfigStores = append(s.ConfigStores, configController) |
| 98 | } else { |
| 99 | err := s.initK8SConfigStore(args) |
| 100 | if err != nil { |
| 101 | return err |
| 102 | } |
| 103 | } |
| 104 | |
| 105 | // If running in ingress mode (requires k8s), wrap the config controller. |
| 106 | if hasKubeRegistry(args.RegistryOptions.Registries) && meshConfig.IngressControllerMode != meshconfig.MeshConfig_OFF { |
| 107 | // Wrap the config controller with a cache. |
| 108 | ic := ingress.NewController( |
| 109 | s.kubeClient, |
| 110 | s.environment.Watcher, |
| 111 | args.RegistryOptions.KubeOptions, |
| 112 | s.XDSServer, |
| 113 | ) |
| 114 | s.ConfigStores = append(s.ConfigStores, ic) |
| 115 | |
| 116 | s.addTerminatingStartFunc("ingress status", func(stop <-chan struct{}) error { |
| 117 | leaderelection. |
| 118 | NewLeaderElection(args.Namespace, args.PodName, leaderelection.IngressController, args.Revision, s.kubeClient). |
| 119 | AddRunFunction(func(leaderStop <-chan struct{}) { |
| 120 | log.Infof("Starting ingress status writer") |
| 121 | ic.SetStatusWrite(true, s.statusManager) |
| 122 | |
| 123 | <-leaderStop |
| 124 | log.Infof("Stopping ingress status writer") |
| 125 | ic.SetStatusWrite(false, nil) |
| 126 | }). |
| 127 | Run(stop) |
| 128 | return nil |
| 129 | }) |
| 130 | } |
| 131 | |
| 132 | // Wrap the config controller with a cache. |
| 133 | aggregateConfigController, err := configaggregate.MakeCache(s.ConfigStores) |
| 134 | if err != nil { |
| 135 | return err |
| 136 | } |
no test coverage detected