MCPcopy Index your code
hub / github.com/riverqueue/river / Work

Method Work

internal/rivermiddleware/middleware.go:30–79  ·  view source on GitHub ↗
(ctx context.Context, job *rivertype.JobRow, doInner func(ctx context.Context) error)

Source from the content-addressed store, hash-verified

28func (*ResumableMiddleware) IsMiddleware() bool { return true }
29
30func (*ResumableMiddleware) Work(ctx context.Context, job *rivertype.JobRow, doInner func(ctx context.Context) error) error {
31 metadataUpdates, hasMetadataUpdates := jobexecutor.MetadataUpdatesFromWorkContext(ctx)
32 if !hasMetadataUpdates {
33 return errors.New("expected to find metadata updates in context, but didn't")
34 }
35
36 state := &ResumableState{
37 AllStepNames: make(map[string]struct{}),
38 Cursors: make(map[string]json.RawMessage),
39 ResumeMatched: true,
40 ResumeStep: gjson.GetBytes(job.Metadata, rivercommon.MetadataKeyResumableStep).Str,
41 }
42 if state.ResumeStep != "" {
43 state.ResumeMatched = false
44 }
45
46 hadCursors := false
47 if cursorJSON := gjson.GetBytes(job.Metadata, rivercommon.MetadataKeyResumableCursor); cursorJSON.Exists() && cursorJSON.Type == gjson.JSON {
48 if err := json.Unmarshal([]byte(cursorJSON.Raw), &state.Cursors); err != nil {
49 return fmt.Errorf("river: unmarshal resumable cursors: %w", err)
50 }
51 hadCursors = len(state.Cursors) > 0
52 }
53
54 ctx = context.WithValue(ctx, ResumableContextKey{}, state)
55
56 err := doInner(ctx)
57 if err == nil {
58 switch {
59 case state.Err != nil:
60 err = state.Err
61 case state.ResumeStep != "" && !state.ResumeMatched:
62 err = fmt.Errorf("river: resumable step %q not found in Worker", state.ResumeStep)
63 }
64 }
65
66 if err != nil && state.CompletedStep != "" {
67 if len(state.Cursors) > 0 {
68 metadataUpdates[rivercommon.MetadataKeyResumableCursor] = state.Cursors
69 } else if hadCursors {
70 // All cursors were consumed (their steps completed). Write null
71 // to clear the stale cursor metadata, since the metadata merge
72 // is additive and wouldn't remove the old key on its own.
73 metadataUpdates[rivercommon.MetadataKeyResumableCursor] = nil
74 }
75 metadataUpdates[rivercommon.MetadataKeyResumableStep] = state.CompletedStep
76 }
77
78 return err
79}
80
81// ResumableState holds the state for a resumable job execution. It is stored in
82// the context and accessed by ResumableStep and ResumableStepCursor.

Callers

nothing calls this directly

Calls 2

ErrorfMethod · 0.65

Tested by

no test coverage detected