MCPcopy Index your code
hub / github.com/riverqueue/river / ResumableStepCursor

Function ResumableStepCursor

resumable.go:107–151  ·  view source on GitHub ↗

ResumableStepCursor runs a resumable step that also receives a persisted cursor value from an earlier failed attempt, if one was recorded with ResumableSetCursor. Step names must be unique across all ResumableStep and ResumableStepCursor calls in the same Worker execution. The cursor type T is user

(ctx context.Context, name string, opts *StepOpts, stepFunc func(ctx context.Context, cursor TCursor) error)

Source from the content-addressed store, hash-verified

105//
106// opts may be nil.
107func ResumableStepCursor[TCursor any](ctx context.Context, name string, opts *StepOpts, stepFunc func(ctx context.Context, cursor TCursor) error) {
108 state := mustResumableState(ctx)
109 if state.Err != nil {
110 return
111 }
112 if !registerResumableStepName(state, name) {
113 return
114 }
115
116 if !state.ResumeMatched {
117 if name == state.ResumeStep {
118 state.CompletedStep = name
119 state.ResumeMatched = true
120
121 // If cursor data exists for this step, it was only partially
122 // completed on the previous attempt. Fall through to re-execute
123 // it with the cursor rather than skipping it.
124 if _, hasCursor := state.Cursors[name]; !hasCursor {
125 return
126 }
127 } else {
128 return
129 }
130 }
131
132 var cursor TCursor
133 if cursorBytes, ok := state.Cursors[name]; ok && len(cursorBytes) > 0 {
134 if err := json.Unmarshal(cursorBytes, &cursor); err != nil {
135 state.Err = fmt.Errorf("river: unmarshal resumable cursor for step %q: %w", name, err)
136 return
137 }
138 }
139
140 previousStepName := state.StepName
141 state.StepName = name
142 defer func() { state.StepName = previousStepName }()
143
144 if err := stepFunc(ctx, cursor); err != nil {
145 state.Err = err
146 return
147 }
148
149 state.CompletedStep = name
150 delete(state.Cursors, name)
151}
152
153func mustResumableState(ctx context.Context) *rivermiddleware.ResumableState {
154 state, ok := resumableStateFromContext(ctx)

Callers 4

WorkMethod · 0.92
TestResumableStepCursorFunction · 0.85
WorkMethod · 0.85

Calls 3

mustResumableStateFunction · 0.85
ErrorfMethod · 0.65

Tested by 4

WorkMethod · 0.74
TestResumableStepCursorFunction · 0.68
WorkMethod · 0.68

Used in the wild real call sites across dependent graphs

searching dependent graphs…