MCPcopy
hub / github.com/flyteorg/flyte / ExampleNewAutoRefreshCache

Function ExampleNewAutoRefreshCache

flytestdlib/utils/auto_refresh_example_test.go:43–96  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

41}
42
43func ExampleNewAutoRefreshCache() {
44 // This auto-refresh cache can be used for cases where keys are created by caller but processed by
45 // an external service and we want to asynchronously keep track of its progress.
46 exampleService := newExampleService()
47
48 // define a sync method that the cache can use to auto-refresh in background
49 syncItemCb := func(ctx context.Context, obj CacheItem) (CacheItem, CacheSyncAction, error) {
50 oldItem := obj.(*ExampleCacheItem)
51 newItem := exampleService.getStatus(oldItem.ID())
52 if newItem.status != oldItem.status {
53 return newItem, Update, nil
54 }
55 return newItem, Unchanged, nil
56 }
57
58 // define resync period as time duration we want cache to refresh. We can go as low as we want but cache
59 // would still be constrained by time it takes to run Sync call for each item.
60 resyncPeriod := time.Millisecond
61
62 // Since number of items in the cache is dynamic, rate limiter is our knob to control resources we spend on
63 // sync.
64 rateLimiter := NewRateLimiter("ExampleRateLimiter", 10000, 1)
65
66 // since cache refreshes itself asynchronously, it may not notice that an object has been deleted immediately,
67 // so users of the cache should have the delete logic aware of this shortcoming (eg. not-exists may be a valid
68 // error during removal if based on status in cache).
69 cache, err := NewAutoRefreshCache(syncItemCb, rateLimiter, resyncPeriod, 100, nil)
70 if err != nil {
71 panic(err)
72 }
73
74 // start the cache with a context that would be to stop the cache by cancelling the context
75 ctx, cancel := context.WithCancel(context.Background())
76 cache.Start(ctx)
77
78 // creating objects that go through a couple of state transitions to reach the final state.
79 item1 := &ExampleCacheItem{status: ExampleStatusNotStarted, id: "item1"}
80 item2 := &ExampleCacheItem{status: ExampleStatusNotStarted, id: "item2"}
81 _, err1 := cache.GetOrCreate(item1)
82 _, err2 := cache.GetOrCreate(item2)
83 if err1 != nil || err2 != nil {
84 fmt.Printf("unexpected error in create; err1: %v, err2: %v", err1, err2)
85 }
86
87 // wait for the cache to go through a few refresh cycles and then check status
88 time.Sleep(resyncPeriod * 10)
89 fmt.Printf("Current status for item1 is %v", cache.Get(item1.ID()).(*ExampleCacheItem).status)
90
91 // stop the cache
92 cancel()
93
94 // Output:
95 // Current status for item1 is Completed
96}

Callers

nothing calls this directly

Calls 10

StartMethod · 0.95
GetOrCreateMethod · 0.95
GetMethod · 0.95
IDMethod · 0.95
NewRateLimiterFunction · 0.85
newExampleServiceFunction · 0.70
NewAutoRefreshCacheFunction · 0.70
IDMethod · 0.65
PrintfMethod · 0.65
getStatusMethod · 0.45

Tested by

no test coverage detected