NewInMemoryAutoRefresh creates a new InMemoryAutoRefresh
( name string, syncCb SyncFunc, syncRateLimiter workqueue.TypedRateLimiter[*Batch], resyncPeriod time.Duration, parallelizm uint, size uint, scope promutils.Scope, options ...Option, )
| 118 | |
| 119 | // NewInMemoryAutoRefresh creates a new InMemoryAutoRefresh |
| 120 | func NewInMemoryAutoRefresh( |
| 121 | name string, |
| 122 | syncCb SyncFunc, |
| 123 | syncRateLimiter workqueue.TypedRateLimiter[*Batch], |
| 124 | resyncPeriod time.Duration, |
| 125 | parallelizm uint, |
| 126 | size uint, |
| 127 | scope promutils.Scope, |
| 128 | options ...Option, |
| 129 | ) (*InMemoryAutoRefresh, error) { |
| 130 | opts := defaultOptions() |
| 131 | for _, option := range options { |
| 132 | option(opts) |
| 133 | } |
| 134 | |
| 135 | metrics := newMetrics(scope) |
| 136 | // #nosec G115 |
| 137 | lruCache, err := lru.NewWithEvict(int(size), getEvictionFunction(metrics.Evictions)) |
| 138 | if err != nil { |
| 139 | return nil, fmt.Errorf("creating LRU cache: %w", err) |
| 140 | } |
| 141 | |
| 142 | cache := &InMemoryAutoRefresh{ |
| 143 | name: name, |
| 144 | metrics: metrics, |
| 145 | parallelizm: parallelizm, |
| 146 | createBatchesCb: opts.createBatchesCb, |
| 147 | syncCb: syncCb, |
| 148 | lruMap: lruCache, |
| 149 | processing: &sync.Map{}, |
| 150 | toDelete: newSyncSet(), |
| 151 | syncPeriod: resyncPeriod, |
| 152 | workqueue: workqueue.NewTypedRateLimitingQueueWithConfig(syncRateLimiter, workqueue.TypedRateLimitingQueueConfig[*Batch]{ |
| 153 | Name: scope.CurrentScope(), |
| 154 | Clock: opts.clock, |
| 155 | }), |
| 156 | clock: opts.clock, |
| 157 | syncCount: atomic.NewInt32(0), |
| 158 | enqueueCount: atomic.NewInt32(0), |
| 159 | enqueueLoopRunning: atomic.NewBool(false), |
| 160 | syncOnCreate: opts.syncOnCreate, |
| 161 | } |
| 162 | |
| 163 | return cache, nil |
| 164 | } |
| 165 | |
| 166 | func (w *InMemoryAutoRefresh) Start(ctx context.Context) error { |
| 167 | for i := uint(0); i < w.parallelizm; i++ { |