ResumableStepAtCursor configures insert options so that the job resumes at the named step with the provided cursor data. The named step and all subsequent steps will execute, with all steps before it skipped. Used with [Worker.Work], simulates a job that was partway through a [river.ResumableStepCu
(opts *river.InsertOpts, stepName string, cursor TCursor)
| 33 | // result, err := testWorker.Work(ctx, t, tx, args, |
| 34 | // rivertest.ResumableStepAtCursor(&river.InsertOpts{}, "process_ids", MyCursor{LastID: 42})) |
| 35 | func ResumableStepAtCursor[TCursor any](opts *river.InsertOpts, stepName string, cursor TCursor) *river.InsertOpts { |
| 36 | cursorBytes, err := json.Marshal(cursor) |
| 37 | if err != nil { |
| 38 | panic(fmt.Sprintf("rivertest: marshal resumable cursor: %s", err)) |
| 39 | } |
| 40 | |
| 41 | mergeResumableMetadata(opts, stepName, map[string]json.RawMessage{ |
| 42 | stepName: json.RawMessage(cursorBytes), |
| 43 | }) |
| 44 | return opts |
| 45 | } |
| 46 | |
| 47 | func mergeResumableMetadata(opts *river.InsertOpts, step string, cursors map[string]json.RawMessage) { |
| 48 | var existing map[string]any |
searching dependent graphs…