MCPcopy Index your code
hub / github.com/riverqueue/river / Example_subscription

Function Example_subscription

example_subscription_test.go:42–133  ·  view source on GitHub ↗

Example_subscription demonstrates the use of client subscriptions to receive events containing information about worked jobs.

()

Source from the content-addressed store, hash-verified

40// Example_subscription demonstrates the use of client subscriptions to receive
41// events containing information about worked jobs.
42func Example_subscription() {
43 ctx := context.Background()
44
45 dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL())
46 if err != nil {
47 panic(err)
48 }
49 defer dbPool.Close()
50
51 workers := river.NewWorkers()
52 river.AddWorker(workers, &SubscriptionWorker{})
53
54 riverClient, err := river.NewClient(riverpgxv5.New(dbPool), initTestConfig(ctx, dbPool, &river.Config{
55 Logger: slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.Level(9), ReplaceAttr: slogutil.NoLevelTime})), // Suppress logging so example output is cleaner (9 > slog.LevelError).
56 Queues: map[string]river.QueueConfig{
57 river.QueueDefault: {MaxWorkers: 100},
58 },
59 Workers: workers,
60 }))
61 if err != nil {
62 panic(err)
63 }
64
65 // Subscribers tell the River client the kinds of events they'd like to receive.
66 completedChan, completedSubscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted)
67 defer completedSubscribeCancel()
68
69 // Multiple simultaneous subscriptions are allowed.
70 failedChan, failedSubscribeCancel := riverClient.Subscribe(river.EventKindJobFailed)
71 defer failedSubscribeCancel()
72
73 otherChan, otherSubscribeCancel := riverClient.Subscribe(river.EventKindJobCancelled, river.EventKindJobSnoozed)
74 defer otherSubscribeCancel()
75
76 if err := riverClient.Start(ctx); err != nil {
77 panic(err)
78 }
79
80 // Insert one job for each subscription above: one to succeed, one to fail,
81 // and one that's cancelled that'll arrive on the "other" channel.
82 _, err = riverClient.Insert(ctx, SubscriptionArgs{}, nil)
83 if err != nil {
84 panic(err)
85 }
86 _, err = riverClient.Insert(ctx, SubscriptionArgs{Fail: true}, nil)
87 if err != nil {
88 panic(err)
89 }
90 _, err = riverClient.Insert(ctx, SubscriptionArgs{Cancel: true}, nil)
91 if err != nil {
92 panic(err)
93 }
94
95 waitForJob := func(subscribeChan <-chan *river.Event) {
96 select {
97 case event := <-subscribeChan:
98 if event == nil {
99 fmt.Printf("Channel is closed\n")

Callers

nothing calls this directly

Calls 13

TestDatabaseURLFunction · 0.92
NewWorkersFunction · 0.92
AddWorkerFunction · 0.92
NewClientFunction · 0.92
NewFunction · 0.92
WaitTimeoutFunction · 0.92
SubscribeMethod · 0.80
InsertMethod · 0.80
AfterMethod · 0.80
initTestConfigFunction · 0.70
CloseMethod · 0.65
StartMethod · 0.65

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…