Note: top level function will be called holding the lock, must kick off goroutines and then return.
(ctx context.Context)
| 104 | |
| 105 | // Note: top level function will be called holding the lock, must kick off goroutines and then return. |
| 106 | func (m *SyncManager) RunSync(ctx context.Context) { |
| 107 | var syncWg sync.WaitGroup |
| 108 | var cancelLastSync context.CancelFunc |
| 109 | |
| 110 | configWatchCh := m.configMgr.OnChange.Subscribe() |
| 111 | defer m.configMgr.OnChange.Unsubscribe(configWatchCh) |
| 112 | defer func() { |
| 113 | zap.L().Info("syncmanager exited") |
| 114 | }() |
| 115 | |
| 116 | startSync := func(config *v1.Config) { |
| 117 | m.mu.Lock() |
| 118 | defer m.mu.Unlock() |
| 119 | |
| 120 | if cancelLastSync != nil { |
| 121 | cancelLastSync() |
| 122 | zap.L().Info("syncmanager applying new config, waiting for existing sync goroutines to exit") |
| 123 | syncWg.Wait() |
| 124 | } else { |
| 125 | zap.L().Info("syncmanager applying new config, starting sync goroutines") |
| 126 | } |
| 127 | syncCtx, cancel := context.WithCancel(ctx) |
| 128 | cancelLastSync = cancel |
| 129 | |
| 130 | if config.Multihost.GetIdentity() == nil { |
| 131 | zap.S().Info("syncmanager no identity key configured, sync feature is disabled.") |
| 132 | m.snapshot = nil // Clear the snapshot to indicate sync is disabled |
| 133 | return |
| 134 | } |
| 135 | |
| 136 | // Pull out configuration from the new config and cache it for sync handler e.g. the config and identity key. |
| 137 | identityKey, err := cryptoutil.NewPrivateKey(config.Multihost.GetIdentity()) |
| 138 | if err != nil { |
| 139 | zap.S().Warnf("syncmanager failed to load local instance identity key, synchandler will reject requests: %v", err) |
| 140 | return |
| 141 | } |
| 142 | |
| 143 | m.snapshot = &syncConfigSnapshot{ |
| 144 | config: config, |
| 145 | identityKey: identityKey, |
| 146 | } |
| 147 | |
| 148 | // Past this point, determine if sync clients are configured and start threads for any. |
| 149 | if len(config.Multihost.GetKnownHosts()) == 0 { |
| 150 | zap.L().Info("syncmanager no known host peers declared, sync client exiting early") |
| 151 | return |
| 152 | } |
| 153 | |
| 154 | zap.S().Infof("sync using identity %v, spawning goroutines for %d known peers", |
| 155 | config.Multihost.GetIdentity().GetKeyid(), len(config.Multihost.GetKnownHosts())) |
| 156 | for _, knownHostPeer := range config.Multihost.KnownHosts { |
| 157 | if knownHostPeer.InstanceId == "" { |
| 158 | continue |
| 159 | } |
| 160 | |
| 161 | syncWg.Add(1) |
| 162 | go func(knownHostPeer *v1.Multihost_Peer) { |
| 163 | defer syncWg.Done() |
no test coverage detected