MCPcopy Index your code
hub / github.com/riverqueue/river / Example_queuePause

Function Example_queuePause

example_queue_pause_test.go:36–111  ·  view source on GitHub ↗

Example_queuePause demonstrates how to pause queues to prevent them from working new jobs, and later resume them.

()

Source from the content-addressed store, hash-verified

34// Example_queuePause demonstrates how to pause queues to prevent them from
35// working new jobs, and later resume them.
36func Example_queuePause() {
37 ctx := context.Background()
38
39 dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL())
40 if err != nil {
41 panic(err)
42 }
43 defer dbPool.Close()
44
45 const (
46 unreliableQueue = "unreliable_external_service"
47 reliableQueue = "reliable_jobs"
48 )
49
50 workers := river.NewWorkers()
51 jobWorkedCh := make(chan string)
52 river.AddWorker(workers, &ReportingWorker{jobWorkedCh: jobWorkedCh})
53
54 riverClient, err := river.NewClient(riverpgxv5.New(dbPool), initTestConfig(ctx, dbPool, &river.Config{
55 Queues: map[string]river.QueueConfig{
56 unreliableQueue: {MaxWorkers: 10},
57 reliableQueue: {MaxWorkers: 10},
58 },
59 Workers: workers,
60 }))
61 if err != nil {
62 panic(err)
63 }
64
65 if err := riverClient.Start(ctx); err != nil {
66 panic(err)
67 }
68
69 // Out of example scope, but used to wait until a queue is paused or unpaused.
70 subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindQueuePaused, river.EventKindQueueResumed)
71 defer subscribeCancel()
72
73 fmt.Printf("Pausing %s queue\n", unreliableQueue)
74 if err := riverClient.QueuePause(ctx, unreliableQueue, nil); err != nil {
75 panic(err)
76 }
77
78 // Wait for queue to be paused:
79 waitOrTimeout(subscribeChan)
80
81 fmt.Println("Inserting one job each into unreliable and reliable queues")
82 if _, err = riverClient.Insert(ctx, ReportingArgs{}, &river.InsertOpts{Queue: unreliableQueue}); err != nil {
83 panic(err)
84 }
85 if _, err = riverClient.Insert(ctx, ReportingArgs{}, &river.InsertOpts{Queue: reliableQueue}); err != nil {
86 panic(err)
87 }
88 // The unreliable queue is paused so its job should get worked yet, while
89 // reliable queue is not paused so its job should get worked immediately:
90 receivedQueue := waitOrTimeout(jobWorkedCh)
91 fmt.Printf("Job worked on %s queue\n", receivedQueue)
92
93 // Resume the unreliable queue so it can work the job:

Callers

nothing calls this directly

Calls 14

TestDatabaseURLFunction · 0.92
NewWorkersFunction · 0.92
AddWorkerFunction · 0.92
NewClientFunction · 0.92
NewFunction · 0.92
waitOrTimeoutFunction · 0.85
SubscribeMethod · 0.80
InsertMethod · 0.80
initTestConfigFunction · 0.70
CloseMethod · 0.65
StartMethod · 0.65
QueuePauseMethod · 0.65

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…