(ctx context.Context, tx TTx, job *Job[TArgs], cursor json.RawMessage)
| 52 | } |
| 53 | |
| 54 | func resumableSetStepTx[TTx any, TArgs JobArgs](ctx context.Context, tx TTx, job *Job[TArgs], cursor json.RawMessage) (*Job[TArgs], error) { |
| 55 | if job.State != rivertype.JobStateRunning { |
| 56 | return nil, errors.New("job must be running") |
| 57 | } |
| 58 | |
| 59 | state, ok := resumableStateFromContext(ctx) |
| 60 | if !ok { |
| 61 | return nil, errors.New("not inside a resumable step; must be called from within ResumableStep or ResumableStepCursor") |
| 62 | } |
| 63 | if state.StepName == "" { |
| 64 | return nil, errors.New("not inside a resumable step; must be called from within ResumableStep or ResumableStepCursor") |
| 65 | } |
| 66 | |
| 67 | step := state.StepName |
| 68 | |
| 69 | client := ClientFromContext[TTx](ctx) |
| 70 | if client == nil { |
| 71 | return nil, errors.New("client not found in context, can only work within a River worker") |
| 72 | } |
| 73 | |
| 74 | metadataUpdates := map[string]any{ |
| 75 | rivercommon.MetadataKeyResumableStep: step, |
| 76 | } |
| 77 | |
| 78 | state.CompletedStep = step |
| 79 | if cursor != nil { |
| 80 | if state.Cursors == nil { |
| 81 | state.Cursors = make(map[string]json.RawMessage) |
| 82 | } |
| 83 | state.Cursors[step] = cursor |
| 84 | } |
| 85 | if len(state.Cursors) > 0 { |
| 86 | metadataUpdates[rivercommon.MetadataKeyResumableCursor] = state.Cursors |
| 87 | } |
| 88 | |
| 89 | workMetadataUpdates, hasWorkMetadataUpdates := jobexecutor.MetadataUpdatesFromWorkContext(ctx) |
| 90 | if hasWorkMetadataUpdates { |
| 91 | workMetadataUpdates[rivercommon.MetadataKeyResumableStep] = step |
| 92 | if resumableCursorMetadata, ok := metadataUpdates[rivercommon.MetadataKeyResumableCursor]; ok { |
| 93 | workMetadataUpdates[rivercommon.MetadataKeyResumableCursor] = resumableCursorMetadata |
| 94 | } |
| 95 | } |
| 96 | |
| 97 | metadataUpdatesBytes, err := json.Marshal(metadataUpdates) |
| 98 | if err != nil { |
| 99 | return nil, err |
| 100 | } |
| 101 | |
| 102 | updatedJob, err := client.Driver().UnwrapExecutor(tx).JobUpdate(ctx, &riverdriver.JobUpdateParams{ |
| 103 | ID: job.ID, |
| 104 | MetadataDoMerge: true, |
| 105 | Metadata: metadataUpdatesBytes, |
| 106 | Schema: client.config.Schema, |
| 107 | }) |
| 108 | if err != nil { |
| 109 | if errors.Is(err, rivertype.ErrNotFound) { |
| 110 | if _, isInsideTestWorker := ctx.Value(execution.ContextKeyInsideTestWorker{}).(bool); isInsideTestWorker { |
| 111 | panic("to use ResumableSetStepTx or ResumableSetStepCursorTx in a rivertest.Worker, the job must be inserted into the database first") |
no test coverage detected
searching dependent graphs…