MCPcopy
hub / github.com/flyteorg/flyte / sync

Method sync

flytestdlib/cache/in_memory_auto_refresh.go:312–390  ·  view source on GitHub ↗

There are w.parallelizm instances of this function running all the time, each one will: - Retrieve an item from the workqueue - For each batch of the keys, call syncCb, which tells us if the items have been updated -- If any has, then overwrite the item in the cache. What happens when the number of

(ctx context.Context)

Source from the content-addressed store, hash-verified

310// * Plugin asks for update on item 2 - cache evicts item 1, stores 2 and returns it unchanged
311// * Sync loop updates item 2, repeat
312func (w *InMemoryAutoRefresh) sync(ctx context.Context) (err error) {
313 defer func() {
314 var isErr bool
315 rVal := recover()
316 if rVal == nil {
317 return
318 }
319
320 if err, isErr = rVal.(error); isErr {
321 err = fmt.Errorf("worker panic'd and is shutting down. Error: %w with Stack: %v", err, string(debug.Stack()))
322 } else {
323 err = fmt.Errorf("worker panic'd and is shutting down. Panic value: %v with Stack: %v", rVal, string(debug.Stack()))
324 }
325
326 logger.Error(ctx, err)
327 }()
328
329 for {
330 select {
331 case <-ctx.Done():
332 return nil
333 default:
334 batch, shutdown := w.workqueue.Get()
335 if shutdown {
336 logger.Debugf(ctx, "Shutting down worker")
337 return nil
338 }
339 // Since we create batches every time we sync, we will just remove the item from the queue here
340 // regardless of whether it succeeded the sync or not.
341 w.workqueue.Forget(batch)
342 w.workqueue.Done(batch)
343
344 newBatch := make(Batch, 0, len(*batch))
345 for _, b := range *batch {
346 itemID := b.GetID()
347 w.processing.Delete(itemID)
348 item, ok := w.lruMap.Get(itemID)
349 if !ok {
350 logger.Debugf(ctx, "item with id [%v] not found in cache", itemID)
351 continue
352 }
353 if item.(Item).IsTerminal() {
354 logger.Debugf(ctx, "item with id [%v] is terminal", itemID)
355 continue
356 }
357 newBatch = append(newBatch, b)
358 }
359 if len(newBatch) == 0 {
360 continue
361 }
362
363 t := w.metrics.SyncLatency.Start()
364 updatedBatch, err := w.syncCb(ctx, newBatch)
365
366 if err != nil {
367 w.metrics.SyncErrors.Inc()
368 logger.Errorf(ctx, "failed to get latest copy of a batch. Error: %v", err)
369 t.Stop()

Callers 1

StartMethod · 0.95

Calls 14

UpdateMethod · 0.95
DeleteMethod · 0.95
ErrorFunction · 0.92
DebugfFunction · 0.92
ErrorfFunction · 0.92
ErrorfMethod · 0.80
GetMethod · 0.65
GetIDMethod · 0.65
DeleteMethod · 0.65
IsTerminalMethod · 0.65
StartMethod · 0.65
StopMethod · 0.65

Tested by

no test coverage detected