MCPcopy Index your code
hub / github.com/riverqueue/river / Work

Method Work

example_resumable_set_step_tx_test.go:34–63  ·  view source on GitHub ↗
(ctx context.Context, job *river.Job[ResumableStepTxArgs])

Source from the content-addressed store, hash-verified

32}
33
34func (w *ResumableStepTxWorker) Work(ctx context.Context, job *river.Job[ResumableStepTxArgs]) error {
35 const durableStep = "durable_step"
36
37 river.ResumableStep(ctx, durableStep, nil, func(ctx context.Context) error {
38 tx, err := w.dbPool.Begin(ctx)
39 if err != nil {
40 return err
41 }
42 defer tx.Rollback(ctx)
43
44 // Perform some kind database work in a transaction.
45 var result int
46 if err := tx.QueryRow(ctx, "SELECT 1").Scan(&result); err != nil {
47 return err
48 }
49
50 // Then, record the step as completed in the same transaction.
51 if _, err := river.ResumableSetStepTx[*riverpgxv5.Driver](ctx, tx, job); err != nil {
52 return err
53 }
54
55 if err := tx.Commit(ctx); err != nil {
56 return err
57 }
58
59 return errors.New("simulated failure after persisting step")
60 })
61
62 return nil
63}
64
65// Example_resumableSetStepTx demonstrates how to transactionally persist a
66// resumable step so it survives a failed attempt.

Callers

nothing calls this directly

Calls 7

ResumableStepFunction · 0.92
ResumableSetStepTxFunction · 0.92
BeginMethod · 0.65
RollbackMethod · 0.65
ScanMethod · 0.65
QueryRowMethod · 0.65
CommitMethod · 0.65

Tested by

no test coverage detected