initServiceControllers creates and initializes the service controllers
(args *PilotArgs)
| 34 | |
| 35 | // initServiceControllers creates and initializes the service controllers |
| 36 | func (s *Server) initServiceControllers(args *PilotArgs) error { |
| 37 | serviceControllers := s.ServiceController() |
| 38 | |
| 39 | s.serviceEntryController = serviceentry.NewController( |
| 40 | s.configController, |
| 41 | s.XDSServer, |
| 42 | s.multiclusterController, |
| 43 | s.environment.Watcher, |
| 44 | serviceentry.WithClusterID(s.clusterID), |
| 45 | serviceentry.WithKRTDebugger(s.krtDebugger), |
| 46 | ) |
| 47 | serviceControllers.AddRegistry(s.serviceEntryController) |
| 48 | |
| 49 | registered := sets.New[provider.ID]() |
| 50 | for _, r := range args.RegistryOptions.Registries { |
| 51 | serviceRegistry := provider.ID(r) |
| 52 | if registered.Contains(serviceRegistry) { |
| 53 | log.Warnf("%s registry specified multiple times.", r) |
| 54 | continue |
| 55 | } |
| 56 | registered.Insert(serviceRegistry) |
| 57 | log.Infof("Adding %s registry adapter", serviceRegistry) |
| 58 | switch serviceRegistry { |
| 59 | case provider.Kubernetes: |
| 60 | if err := s.initKubeRegistry(args); err != nil { |
| 61 | return err |
| 62 | } |
| 63 | default: |
| 64 | return fmt.Errorf("service registry %s is not supported", r) |
| 65 | } |
| 66 | } |
| 67 | |
| 68 | // Defer running of the service controllers. |
| 69 | s.addStartFunc("service controllers", func(stop <-chan struct{}) error { |
| 70 | go serviceControllers.Run(stop) |
| 71 | return nil |
| 72 | }) |
| 73 | |
| 74 | if features.EnableAmbient { |
| 75 | s.ambientIndex = ambient.New(ambient.Options{ |
| 76 | SystemNamespace: args.Namespace, |
| 77 | DomainSuffix: args.RegistryOptions.KubeOptions.DomainSuffix, |
| 78 | ClusterID: s.clusterID, |
| 79 | Revision: args.Revision, |
| 80 | XDSUpdater: s.XDSServer, |
| 81 | MeshConfig: s.environment.Watcher, |
| 82 | StatusNotifier: args.RegistryOptions.KubeOptions.StatusWritingEnabled, |
| 83 | Debugger: args.KrtDebugger, |
| 84 | MultiClusterController: s.multiclusterController, |
| 85 | Flags: ambient.FeatureFlags{ |
| 86 | DefaultAllowFromWaypoint: features.DefaultAllowFromWaypoint, |
| 87 | EnableK8SServiceSelectWorkloadEntries: features.EnableK8SServiceSelectWorkloadEntries, |
| 88 | }, |
| 89 | }) |
| 90 | s.environment.AmbientIndexes = s.ambientIndex |
| 91 | |
| 92 | s.addStartFunc("ambient index", func(stop <-chan struct{}) error { |
| 93 | go s.ambientIndex.Run(stop) |
no test coverage detected