MCPcopy
hub / github.com/garethgeorge/backrest / RunSync

Method RunSync

internal/api/syncapi/syncmanager.go:106–237  ·  view source on GitHub ↗

Note: top level function will be called holding the lock, must kick off goroutines and then return.

(ctx context.Context)

Source from the content-addressed store, hash-verified

104
105// Note: top level function will be called holding the lock, must kick off goroutines and then return.
106func (m *SyncManager) RunSync(ctx context.Context) {
107 var syncWg sync.WaitGroup
108 var cancelLastSync context.CancelFunc
109
110 configWatchCh := m.configMgr.OnChange.Subscribe()
111 defer m.configMgr.OnChange.Unsubscribe(configWatchCh)
112 defer func() {
113 zap.L().Info("syncmanager exited")
114 }()
115
116 startSync := func(config *v1.Config) {
117 m.mu.Lock()
118 defer m.mu.Unlock()
119
120 if cancelLastSync != nil {
121 cancelLastSync()
122 zap.L().Info("syncmanager applying new config, waiting for existing sync goroutines to exit")
123 syncWg.Wait()
124 } else {
125 zap.L().Info("syncmanager applying new config, starting sync goroutines")
126 }
127 syncCtx, cancel := context.WithCancel(ctx)
128 cancelLastSync = cancel
129
130 if config.Multihost.GetIdentity() == nil {
131 zap.S().Info("syncmanager no identity key configured, sync feature is disabled.")
132 m.snapshot = nil // Clear the snapshot to indicate sync is disabled
133 return
134 }
135
136 // Pull out configuration from the new config and cache it for sync handler e.g. the config and identity key.
137 identityKey, err := cryptoutil.NewPrivateKey(config.Multihost.GetIdentity())
138 if err != nil {
139 zap.S().Warnf("syncmanager failed to load local instance identity key, synchandler will reject requests: %v", err)
140 return
141 }
142
143 m.snapshot = &syncConfigSnapshot{
144 config: config,
145 identityKey: identityKey,
146 }
147
148 // Past this point, determine if sync clients are configured and start threads for any.
149 if len(config.Multihost.GetKnownHosts()) == 0 {
150 zap.L().Info("syncmanager no known host peers declared, sync client exiting early")
151 return
152 }
153
154 zap.S().Infof("sync using identity %v, spawning goroutines for %d known peers",
155 config.Multihost.GetIdentity().GetKeyid(), len(config.Multihost.GetKnownHosts()))
156 for _, knownHostPeer := range config.Multihost.KnownHosts {
157 if knownHostPeer.InstanceId == "" {
158 continue
159 }
160
161 syncWg.Add(1)
162 go func(knownHostPeer *v1.Multihost_Peer) {
163 defer syncWg.Done()

Callers 1

runAppFunction · 0.95

Calls 13

NewPrivateKeyFunction · 0.92
LockMethod · 0.80
GetIdentityMethod · 0.80
GetKnownHostsMethod · 0.80
DoneMethod · 0.80
CloneMethod · 0.80
SubscribeMethod · 0.65
UnsubscribeMethod · 0.65
AddMethod · 0.65
GetMethod · 0.65
UnlockMethod · 0.45

Tested by

no test coverage detected