(ctx context.Context, job *rivertype.JobRow, doInner func(ctx context.Context) error)
| 28 | func (*ResumableMiddleware) IsMiddleware() bool { return true } |
| 29 | |
| 30 | func (*ResumableMiddleware) Work(ctx context.Context, job *rivertype.JobRow, doInner func(ctx context.Context) error) error { |
| 31 | metadataUpdates, hasMetadataUpdates := jobexecutor.MetadataUpdatesFromWorkContext(ctx) |
| 32 | if !hasMetadataUpdates { |
| 33 | return errors.New("expected to find metadata updates in context, but didn't") |
| 34 | } |
| 35 | |
| 36 | state := &ResumableState{ |
| 37 | AllStepNames: make(map[string]struct{}), |
| 38 | Cursors: make(map[string]json.RawMessage), |
| 39 | ResumeMatched: true, |
| 40 | ResumeStep: gjson.GetBytes(job.Metadata, rivercommon.MetadataKeyResumableStep).Str, |
| 41 | } |
| 42 | if state.ResumeStep != "" { |
| 43 | state.ResumeMatched = false |
| 44 | } |
| 45 | |
| 46 | hadCursors := false |
| 47 | if cursorJSON := gjson.GetBytes(job.Metadata, rivercommon.MetadataKeyResumableCursor); cursorJSON.Exists() && cursorJSON.Type == gjson.JSON { |
| 48 | if err := json.Unmarshal([]byte(cursorJSON.Raw), &state.Cursors); err != nil { |
| 49 | return fmt.Errorf("river: unmarshal resumable cursors: %w", err) |
| 50 | } |
| 51 | hadCursors = len(state.Cursors) > 0 |
| 52 | } |
| 53 | |
| 54 | ctx = context.WithValue(ctx, ResumableContextKey{}, state) |
| 55 | |
| 56 | err := doInner(ctx) |
| 57 | if err == nil { |
| 58 | switch { |
| 59 | case state.Err != nil: |
| 60 | err = state.Err |
| 61 | case state.ResumeStep != "" && !state.ResumeMatched: |
| 62 | err = fmt.Errorf("river: resumable step %q not found in Worker", state.ResumeStep) |
| 63 | } |
| 64 | } |
| 65 | |
| 66 | if err != nil && state.CompletedStep != "" { |
| 67 | if len(state.Cursors) > 0 { |
| 68 | metadataUpdates[rivercommon.MetadataKeyResumableCursor] = state.Cursors |
| 69 | } else if hadCursors { |
| 70 | // All cursors were consumed (their steps completed). Write null |
| 71 | // to clear the stale cursor metadata, since the metadata merge |
| 72 | // is additive and wouldn't remove the old key on its own. |
| 73 | metadataUpdates[rivercommon.MetadataKeyResumableCursor] = nil |
| 74 | } |
| 75 | metadataUpdates[rivercommon.MetadataKeyResumableStep] = state.CompletedStep |
| 76 | } |
| 77 | |
| 78 | return err |
| 79 | } |
| 80 | |
| 81 | // ResumableState holds the state for a resumable job execution. It is stored in |
| 82 | // the context and accessed by ResumableStep and ResumableStepCursor. |
nothing calls this directly
no test coverage detected