MCPcopy
hub / github.com/flyteorg/flyte / ExampleNewAutoRefreshCache

Function ExampleNewAutoRefreshCache

flytestdlib/cache/auto_refresh_example_test.go:60–129  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

58}
59
60func ExampleNewAutoRefreshCache() {
61 // This auto-refresh cache can be used for cases where keys are created by caller but processed by
62 // an external service and we want to asynchronously keep track of its progress.
63 exampleService := newExampleService()
64
65 // define a sync method that the cache can use to auto-refresh in background
66 syncItemCb := func(ctx context.Context, batch []ItemWrapper) ([]ItemSyncResponse, error) {
67 updatedItems := make([]ItemSyncResponse, 0, len(batch))
68 for _, obj := range batch {
69 oldItem := obj.GetItem().(*ExampleCacheItem)
70 newItem := exampleService.getStatus(oldItem.ID())
71 if newItem.status != oldItem.status {
72 updatedItems = append(updatedItems, ItemSyncResponse{
73 ID: oldItem.ID(),
74 Item: newItem,
75 Action: Update,
76 })
77 }
78 }
79
80 return updatedItems, nil
81 }
82
83 // define resync period as time duration we want cache to refresh. We can go as low as we want but cache
84 // would still be constrained by time it takes to run Sync call for each item.
85 resyncPeriod := time.Millisecond
86
87 // Since number of items in the cache is dynamic, rate limiter is our knob to control resources we spend on
88 // sync.
89 rateLimiter := workqueue.DefaultTypedControllerRateLimiter[*Batch]()
90
91 // since cache refreshes itself asynchronously, it may not notice that an object has been deleted immediately,
92 // so users of the cache should have the delete logic aware of this shortcoming (eg. not-exists may be a valid
93 // error during removal if based on status in cache).
94 cache, err := NewAutoRefreshCache("my-cache", syncItemCb, rateLimiter, resyncPeriod, 10, 100, promutils.NewTestScope())
95 if err != nil {
96 panic(err)
97 }
98
99 // start the cache with a context that would be to stop the cache by cancelling the context
100 ctx, cancel := context.WithCancel(context.Background())
101 err = cache.Start(ctx)
102 if err != nil {
103 panic(err)
104 }
105
106 // creating objects that go through a couple of state transitions to reach the final state.
107 item1 := &ExampleCacheItem{status: ExampleStatusNotStarted, id: "item1"}
108 item2 := &ExampleCacheItem{status: ExampleStatusNotStarted, id: "item2"}
109 _, err1 := cache.GetOrCreate(item1.id, item1)
110 _, err2 := cache.GetOrCreate(item2.id, item2)
111 if err1 != nil || err2 != nil {
112 fmt.Printf("unexpected error in create; err1: %v, err2: %v", err1, err2)
113 }
114
115 // wait for the cache to go through a few refresh cycles and then check status
116 time.Sleep(resyncPeriod * 10)
117 item, err := cache.Get(item1.ID())

Callers

nothing calls this directly

Calls 12

StartMethod · 0.95
GetOrCreateMethod · 0.95
GetMethod · 0.95
IDMethod · 0.95
NewTestScopeFunction · 0.92
IsCausedByFunction · 0.92
newExampleServiceFunction · 0.70
NewAutoRefreshCacheFunction · 0.70
GetItemMethod · 0.65
IDMethod · 0.65
PrintfMethod · 0.65
getStatusMethod · 0.45

Tested by

no test coverage detected