MCPcopy
hub / github.com/perkeep/perkeep / newSyncFromConfig

Function newSyncFromConfig

pkg/server/sync.go:134–221  ·  view source on GitHub ↗
(ld blobserver.Loader, conf jsonconfig.Obj)

Source from the content-addressed store, hash-verified

132var validateOnStartDefault, _ = strconv.ParseBool(os.Getenv("CAMLI_SYNC_VALIDATE"))
133
134func newSyncFromConfig(ld blobserver.Loader, conf jsonconfig.Obj) (http.Handler, error) {
135 var (
136 from = conf.RequiredString("from")
137 to = conf.RequiredString("to")
138 fullSync = conf.OptionalBool("fullSyncOnStart", false)
139 blockFullSync = conf.OptionalBool("blockingFullSyncOnStart", false)
140 idle = conf.OptionalBool("idle", false)
141 queueConf = conf.OptionalObject("queue")
142 copierPoolSize = conf.OptionalInt("copierPoolSize", 5)
143 validate = conf.OptionalBool("validateOnStart", validateOnStartDefault)
144 hourlyCompare = conf.OptionalInt("hourlyCompareBytes", 0)
145 )
146 if err := conf.Validate(); err != nil {
147 return nil, err
148 }
149 if idle {
150 return newIdleSyncHandler(from, to), nil
151 }
152 if len(queueConf) == 0 {
153 return nil, errors.New(`Missing required "queue" object`)
154 }
155 q, err := sorted.NewKeyValueMaybeWipe(queueConf)
156 if err != nil {
157 return nil, err
158 }
159
160 isToIndex := false
161 fromBs, err := ld.GetStorage(from)
162 if err != nil {
163 return nil, err
164 }
165 toBs, err := ld.GetStorage(to)
166 if err != nil {
167 return nil, err
168 }
169 if _, ok := fromBs.(*index.Index); !ok {
170 if _, ok := toBs.(*index.Index); ok {
171 isToIndex = true
172 }
173 }
174
175 sh := newSyncHandler(from, to, fromBs, toBs, q)
176 sh.toIndex = isToIndex
177 sh.copierPoolSize = copierPoolSize
178 if err := sh.readQueueToMemory(); err != nil {
179 return nil, fmt.Errorf("Error reading sync queue to memory: %v", err)
180 }
181
182 if fullSync || blockFullSync {
183 sh.logf("Doing full sync")
184 didFullSync := make(chan bool, 1)
185 go func() {
186 for {
187 n := sh.runSync("pending blobs queue", sh.enumeratePendingBlobs)
188 if n > 0 {
189 sh.logf("Queue sync copied %d blobs", n)
190 continue
191 }

Callers

nothing calls this directly

Calls 13

NewKeyValueMaybeWipeFunction · 0.92
GetHubFunction · 0.92
newIdleSyncHandlerFunction · 0.85
newSyncHandlerFunction · 0.85
blobserverEnumeratorFunction · 0.85
readQueueToMemoryMethod · 0.80
runSyncMethod · 0.80
syncLoopMethod · 0.80
startFullValidationMethod · 0.80
hourlyCompareMethod · 0.80
GetStorageMethod · 0.65
AddReceiveHookMethod · 0.65

Tested by

no test coverage detected