MCPcopy Index your code
hub / github.com/riverqueue/river / TestResumableSetStepTx

Function TestResumableSetStepTx

resumable_step_tx_test.go:25–169  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

23)
24
25func TestResumableSetStepTx(t *testing.T) {
26 t.Parallel()
27
28 ctx := context.Background()
29
30 type JobArgs struct {
31 testutil.JobArgsReflectKind[JobArgs]
32 }
33
34 type testBundle struct {
35 client *Client[pgx.Tx]
36 exec riverdriver.Executor
37 tx pgx.Tx
38 }
39
40 setup := func(ctx context.Context, t *testing.T, stepName string) (context.Context, *testBundle) {
41 t.Helper()
42
43 tx := riverdbtest.TestTxPgx(ctx, t)
44 client, err := NewClient(riverpgxv5.New(nil), &Config{
45 Logger: riversharedtest.Logger(t),
46 })
47 require.NoError(t, err)
48 ctx = context.WithValue(ctx, rivercommon.ContextKeyClient{}, client)
49 ctx = context.WithValue(ctx, jobexecutor.ContextKeyMetadataUpdates, make(map[string]any))
50 ctx = context.WithValue(ctx, rivermiddleware.ResumableContextKey{}, &rivermiddleware.ResumableState{
51 Cursors: make(map[string]json.RawMessage),
52 StepName: stepName,
53 })
54
55 return ctx, &testBundle{
56 client: client,
57 exec: riverpgxv5.New(nil).UnwrapExecutor(tx),
58 tx: tx,
59 }
60 }
61
62 t.Run("SetsStep", func(t *testing.T) {
63 t.Parallel()
64
65 ctx, bundle := setup(ctx, t, "step1")
66
67 job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{
68 State: ptrutil.Ptr(rivertype.JobStateRunning),
69 })
70
71 updatedJob, err := ResumableSetStepTx[*riverpgxv5.Driver](ctx, bundle.tx, &Job[JobArgs]{JobRow: job})
72 require.NoError(t, err)
73 require.Equal(t, rivertype.JobStateRunning, updatedJob.State)
74
75 reloadedJob, err := bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: job.ID})
76 require.NoError(t, err)
77
78 var metadata map[string]any
79 require.NoError(t, json.Unmarshal(reloadedJob.Metadata, &metadata))
80 require.Equal(t, "step1", metadata[rivercommon.MetadataKeyResumableStep])
81 })
82

Callers

nothing calls this directly

Calls 15

TestTxPgxFunction · 0.92
NewFunction · 0.92
LoggerFunction · 0.92
JobFunction · 0.92
PtrFunction · 0.92
NewClientFunction · 0.85
ResumableSetStepTxFunction · 0.85
ResumableSetStepCursorTxFunction · 0.85
JobDeleteTxMethod · 0.80
HelperMethod · 0.65
UnwrapExecutorMethod · 0.65

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…