MCPcopy Index your code
hub / github.com/riverqueue/river / Example_uniqueJob

Function Example_uniqueJob

example_unique_job_test.go:63–128  ·  view source on GitHub ↗

Example_uniqueJob demonstrates the use of a job with custom job-specific insertion options.

()

Source from the content-addressed store, hash-verified

61// Example_uniqueJob demonstrates the use of a job with custom
62// job-specific insertion options.
63func Example_uniqueJob() {
64 ctx := context.Background()
65
66 dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL())
67 if err != nil {
68 panic(err)
69 }
70 defer dbPool.Close()
71
72 workers := river.NewWorkers()
73 river.AddWorker(workers, &ReconcileAccountWorker{})
74
75 riverClient, err := river.NewClient(riverpgxv5.New(dbPool), initTestConfig(ctx, dbPool, &river.Config{
76 Queues: map[string]river.QueueConfig{
77 river.QueueDefault: {MaxWorkers: 100},
78 },
79 Workers: workers,
80 }))
81 if err != nil {
82 panic(err)
83 }
84
85 // Out of example scope, but used to wait until a job is worked.
86 subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted)
87 defer subscribeCancel()
88
89 if err := riverClient.Start(ctx); err != nil {
90 panic(err)
91 }
92
93 // First job insertion for account 1.
94 _, err = riverClient.Insert(ctx, ReconcileAccountArgs{AccountID: 1}, nil)
95 if err != nil {
96 panic(err)
97 }
98
99 // Job is inserted a second time, but it doesn't matter because its unique
100 // args cause the insertion to be skipped because it's meant to only run
101 // once per account per 24 hour period.
102 _, err = riverClient.Insert(ctx, ReconcileAccountArgs{AccountID: 1}, nil)
103 if err != nil {
104 panic(err)
105 }
106
107 // Cheat a little by waiting for the first job to come back so we can
108 // guarantee that this example's output comes out in order.
109 // Wait for jobs to complete. Only needed for purposes of the example test.
110 riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1)
111
112 // Because the job is unique ByArgs, another job for account 2 is allowed.
113 _, err = riverClient.Insert(ctx, ReconcileAccountArgs{AccountID: 2}, nil)
114 if err != nil {
115 panic(err)
116 }
117
118 // Wait for jobs to complete. Only needed for purposes of the example test.
119 riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1)
120

Callers

nothing calls this directly

Calls 13

TestDatabaseURLFunction · 0.92
NewWorkersFunction · 0.92
AddWorkerFunction · 0.92
NewClientFunction · 0.92
NewFunction · 0.92
WaitOrTimeoutNFunction · 0.92
PanicTBFunction · 0.92
SubscribeMethod · 0.80
InsertMethod · 0.80
initTestConfigFunction · 0.70
CloseMethod · 0.65
StartMethod · 0.65

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…