| 28 | } |
| 29 | |
| 30 | func (w *TransactionalWorker) Work(ctx context.Context, job *river.Job[TransactionalArgs]) error { |
| 31 | tx, err := w.dbPool.Begin(ctx) |
| 32 | if err != nil { |
| 33 | return err |
| 34 | } |
| 35 | defer tx.Rollback(ctx) |
| 36 | |
| 37 | var result int |
| 38 | if err := tx.QueryRow(ctx, "SELECT 1").Scan(&result); err != nil { |
| 39 | return err |
| 40 | } |
| 41 | |
| 42 | // The function needs to know the type of the database driver in use by the |
| 43 | // Client, but the other generic parameters can be inferred. |
| 44 | jobAfter, err := river.JobCompleteTx[*riverpgxv5.Driver](ctx, tx, job) |
| 45 | if err != nil { |
| 46 | return err |
| 47 | } |
| 48 | fmt.Printf("Transitioned TransactionalWorker job from %q to %q\n", job.State, jobAfter.State) |
| 49 | |
| 50 | if err = tx.Commit(ctx); err != nil { |
| 51 | return err |
| 52 | } |
| 53 | return nil |
| 54 | } |
| 55 | |
| 56 | // Example_completeJobWithinTx demonstrates how to transactionally complete |
| 57 | // a job alongside other database changes being made. |