ResumableStepCursor runs a resumable step that also receives a persisted cursor value from an earlier failed attempt, if one was recorded with ResumableSetCursor. Step names must be unique across all ResumableStep and ResumableStepCursor calls in the same Worker execution. The cursor type T is user
(ctx context.Context, name string, opts *StepOpts, stepFunc func(ctx context.Context, cursor TCursor) error)
| 105 | // |
| 106 | // opts may be nil. |
| 107 | func ResumableStepCursor[TCursor any](ctx context.Context, name string, opts *StepOpts, stepFunc func(ctx context.Context, cursor TCursor) error) { |
| 108 | state := mustResumableState(ctx) |
| 109 | if state.Err != nil { |
| 110 | return |
| 111 | } |
| 112 | if !registerResumableStepName(state, name) { |
| 113 | return |
| 114 | } |
| 115 | |
| 116 | if !state.ResumeMatched { |
| 117 | if name == state.ResumeStep { |
| 118 | state.CompletedStep = name |
| 119 | state.ResumeMatched = true |
| 120 | |
| 121 | // If cursor data exists for this step, it was only partially |
| 122 | // completed on the previous attempt. Fall through to re-execute |
| 123 | // it with the cursor rather than skipping it. |
| 124 | if _, hasCursor := state.Cursors[name]; !hasCursor { |
| 125 | return |
| 126 | } |
| 127 | } else { |
| 128 | return |
| 129 | } |
| 130 | } |
| 131 | |
| 132 | var cursor TCursor |
| 133 | if cursorBytes, ok := state.Cursors[name]; ok && len(cursorBytes) > 0 { |
| 134 | if err := json.Unmarshal(cursorBytes, &cursor); err != nil { |
| 135 | state.Err = fmt.Errorf("river: unmarshal resumable cursor for step %q: %w", name, err) |
| 136 | return |
| 137 | } |
| 138 | } |
| 139 | |
| 140 | previousStepName := state.StepName |
| 141 | state.StepName = name |
| 142 | defer func() { state.StepName = previousStepName }() |
| 143 | |
| 144 | if err := stepFunc(ctx, cursor); err != nil { |
| 145 | state.Err = err |
| 146 | return |
| 147 | } |
| 148 | |
| 149 | state.CompletedStep = name |
| 150 | delete(state.Cursors, name) |
| 151 | } |
| 152 | |
| 153 | func mustResumableState(ctx context.Context) *rivermiddleware.ResumableState { |
| 154 | state, ok := resumableStateFromContext(ctx) |
searching dependent graphs…