MCPcopy Index your code
hub / github.com/riverqueue/river / resumableSetStepTx

Function resumableSetStepTx

resumable_step_tx.go:54–124  ·  view source on GitHub ↗
(ctx context.Context, tx TTx, job *Job[TArgs], cursor json.RawMessage)

Source from the content-addressed store, hash-verified

52}
53
54func resumableSetStepTx[TTx any, TArgs JobArgs](ctx context.Context, tx TTx, job *Job[TArgs], cursor json.RawMessage) (*Job[TArgs], error) {
55 if job.State != rivertype.JobStateRunning {
56 return nil, errors.New("job must be running")
57 }
58
59 state, ok := resumableStateFromContext(ctx)
60 if !ok {
61 return nil, errors.New("not inside a resumable step; must be called from within ResumableStep or ResumableStepCursor")
62 }
63 if state.StepName == "" {
64 return nil, errors.New("not inside a resumable step; must be called from within ResumableStep or ResumableStepCursor")
65 }
66
67 step := state.StepName
68
69 client := ClientFromContext[TTx](ctx)
70 if client == nil {
71 return nil, errors.New("client not found in context, can only work within a River worker")
72 }
73
74 metadataUpdates := map[string]any{
75 rivercommon.MetadataKeyResumableStep: step,
76 }
77
78 state.CompletedStep = step
79 if cursor != nil {
80 if state.Cursors == nil {
81 state.Cursors = make(map[string]json.RawMessage)
82 }
83 state.Cursors[step] = cursor
84 }
85 if len(state.Cursors) > 0 {
86 metadataUpdates[rivercommon.MetadataKeyResumableCursor] = state.Cursors
87 }
88
89 workMetadataUpdates, hasWorkMetadataUpdates := jobexecutor.MetadataUpdatesFromWorkContext(ctx)
90 if hasWorkMetadataUpdates {
91 workMetadataUpdates[rivercommon.MetadataKeyResumableStep] = step
92 if resumableCursorMetadata, ok := metadataUpdates[rivercommon.MetadataKeyResumableCursor]; ok {
93 workMetadataUpdates[rivercommon.MetadataKeyResumableCursor] = resumableCursorMetadata
94 }
95 }
96
97 metadataUpdatesBytes, err := json.Marshal(metadataUpdates)
98 if err != nil {
99 return nil, err
100 }
101
102 updatedJob, err := client.Driver().UnwrapExecutor(tx).JobUpdate(ctx, &riverdriver.JobUpdateParams{
103 ID: job.ID,
104 MetadataDoMerge: true,
105 Metadata: metadataUpdatesBytes,
106 Schema: client.config.Schema,
107 })
108 if err != nil {
109 if errors.Is(err, rivertype.ErrNotFound) {
110 if _, isInsideTestWorker := ctx.Value(execution.ContextKeyInsideTestWorker{}).(bool); isInsideTestWorker {
111 panic("to use ResumableSetStepTx or ResumableSetStepCursorTx in a rivertest.Worker, the job must be inserted into the database first")

Callers 2

ResumableSetStepTxFunction · 0.85
ResumableSetStepCursorTxFunction · 0.85

Calls 7

DriverMethod · 0.80
JobUpdateMethod · 0.65
UnwrapExecutorMethod · 0.65
IsMethod · 0.45
ValueMethod · 0.45

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…