MCPcopy
hub / github.com/riverqueue/river / Example_insertAndWork

Function Example_insertAndWork

example_insert_and_work_test.go:35–98  ·  view source on GitHub ↗

Example_insertAndWork demonstrates how to register job workers, start a client, and insert a job on it to be worked.

()

Source from the content-addressed store, hash-verified

33// Example_insertAndWork demonstrates how to register job workers, start a
34// client, and insert a job on it to be worked.
35func Example_insertAndWork() {
36 ctx := context.Background()
37
38 dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL())
39 if err != nil {
40 panic(err)
41 }
42 defer dbPool.Close()
43
44 workers := river.NewWorkers()
45 river.AddWorker(workers, &SortWorker{})
46
47 riverClient, err := river.NewClient(riverpgxv5.New(dbPool), initTestConfig(ctx, dbPool, &river.Config{
48 Queues: map[string]river.QueueConfig{
49 river.QueueDefault: {MaxWorkers: 100},
50 },
51 Workers: workers,
52 }))
53 if err != nil {
54 panic(err)
55 }
56
57 // Out of example scope, but used to wait until a job is worked.
58 subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted)
59 defer subscribeCancel()
60
61 if err := riverClient.Start(ctx); err != nil {
62 panic(err)
63 }
64
65 // Start a transaction to insert a job. It's also possible to insert a job
66 // outside a transaction, but this usage is recommended to ensure that all
67 // data a job needs to run is available by the time it starts. Because of
68 // snapshot visibility guarantees across transactions, the job will not be
69 // worked until the transaction has committed.
70 tx, err := dbPool.Begin(ctx)
71 if err != nil {
72 panic(err)
73 }
74 defer tx.Rollback(ctx)
75
76 _, err = riverClient.InsertTx(ctx, tx, SortArgs{
77 Strings: []string{
78 "whale", "tiger", "bear",
79 },
80 }, nil)
81 if err != nil {
82 panic(err)
83 }
84
85 if err := tx.Commit(ctx); err != nil {
86 panic(err)
87 }
88
89 // Wait for jobs to complete. Only needed for purposes of the example test.
90 riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1)
91
92 if err := riverClient.Stop(ctx); err != nil {

Callers

nothing calls this directly

Calls 15

TestDatabaseURLFunction · 0.92
NewWorkersFunction · 0.92
AddWorkerFunction · 0.92
NewClientFunction · 0.92
NewFunction · 0.92
WaitOrTimeoutNFunction · 0.92
PanicTBFunction · 0.92
SubscribeMethod · 0.80
InsertTxMethod · 0.80
initTestConfigFunction · 0.70
CloseMethod · 0.65
StartMethod · 0.65

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…