(t *testing.T)
| 78 | } |
| 79 | |
| 80 | func TestResumableOptsIntegration(t *testing.T) { |
| 81 | t.Parallel() |
| 82 | |
| 83 | ctx := context.Background() |
| 84 | |
| 85 | type testBundle struct { |
| 86 | driver *riverpgxv5.Driver |
| 87 | tx pgx.Tx |
| 88 | } |
| 89 | |
| 90 | setup := func(t *testing.T) *testBundle { |
| 91 | t.Helper() |
| 92 | |
| 93 | return &testBundle{ |
| 94 | driver: riverpgxv5.New(nil), |
| 95 | tx: riverdbtest.TestTxPgx(ctx, t), |
| 96 | } |
| 97 | } |
| 98 | |
| 99 | t.Run("ResumableStepAfterSkipsCompletedSteps", func(t *testing.T) { |
| 100 | t.Parallel() |
| 101 | |
| 102 | bundle := setup(t) |
| 103 | |
| 104 | var ran []string |
| 105 | worker := river.WorkFunc(func(ctx context.Context, job *river.Job[resumableTestArgs]) error { |
| 106 | river.ResumableStep(ctx, "step1", nil, func(ctx context.Context) error { |
| 107 | ran = append(ran, "step1") |
| 108 | return nil |
| 109 | }) |
| 110 | river.ResumableStep(ctx, "step2", nil, func(ctx context.Context) error { |
| 111 | ran = append(ran, "step2") |
| 112 | return nil |
| 113 | }) |
| 114 | river.ResumableStep(ctx, "step3", nil, func(ctx context.Context) error { |
| 115 | ran = append(ran, "step3") |
| 116 | return nil |
| 117 | }) |
| 118 | return nil |
| 119 | }) |
| 120 | |
| 121 | config := &river.Config{ID: "rivertest-resumable"} |
| 122 | tw := NewWorker(t, bundle.driver, config, worker) |
| 123 | |
| 124 | opts := &river.InsertOpts{} |
| 125 | ResumableStepAfter(opts, "step1") |
| 126 | |
| 127 | result, err := tw.Work(ctx, t, bundle.tx, resumableTestArgs{}, opts) |
| 128 | require.NoError(t, err) |
| 129 | require.Equal(t, river.EventKindJobCompleted, result.EventKind) |
| 130 | require.Equal(t, []string{"step2", "step3"}, ran) |
| 131 | }) |
| 132 | |
| 133 | t.Run("ResumeAtCursorStepPassesCursor", func(t *testing.T) { |
| 134 | t.Parallel() |
| 135 | |
| 136 | bundle := setup(t) |
| 137 |
nothing calls this directly
no test coverage detected
searching dependent graphs…