MCPcopy Index your code
hub / github.com/riverqueue/river / Example_jobStuckHandler

Function Example_jobStuckHandler

example_job_stuck_handler_test.go:43–118  ·  view source on GitHub ↗

Example_jobStuckHandler demonstrates how to use JobStuckHandler to stop a client when too many jobs are stuck so the process can be restarted. For the first couple stuck jobs it uses AddWorkerSlot to add additional worker slots to replace those occupied by stuck jobs, but after maxStuckJobsBeforeRes

()

Source from the content-addressed store, hash-verified

41// to replace those occupied by stuck jobs, but after maxStuckJobsBeforeRestart
42// it gives up and exits so it can be restarted.
43func Example_jobStuckHandler() {
44 ctx := context.Background()
45
46 dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL())
47 if err != nil {
48 panic(err)
49 }
50 defer dbPool.Close()
51
52 var riverClient *river.Client[pgx.Tx]
53
54 const maxStuckJobsBeforeRestart = 2
55
56 var (
57 releaseJobs = make(chan struct{})
58 restartRequested = make(chan struct{})
59 started = make(chan struct{}, maxStuckJobsBeforeRestart+1)
60 stopOnce sync.Once
61 )
62
63 workers := river.NewWorkers()
64 river.AddWorker(workers, &StuckJobHandlerWorker{releaseJobs: releaseJobs, started: started})
65
66 riverClient, err = river.NewClient(riverpgxv5.New(dbPool), initTestConfig(ctx, dbPool, &river.Config{
67 JobStuckHandler: func(ctx context.Context, params river.JobStuckHandlerParams) river.JobStuckHandlerResult {
68 if params.TotalStuckJobs <= maxStuckJobsBeforeRestart {
69 fmt.Printf("stuck jobs: %d; opening replacement worker slot\n", params.TotalStuckJobs)
70 return river.JobStuckHandlerResult{AddWorkerSlot: true}
71 }
72
73 stopOnce.Do(func() {
74 fmt.Printf("too many stuck jobs: %d; shutting down so the process can restart\n", params.TotalStuckJobs)
75 close(restartRequested)
76
77 shutdownCtx := context.WithoutCancel(ctx)
78 go func() {
79 if err := riverClient.Stop(shutdownCtx); err != nil {
80 panic(err)
81 }
82 }()
83 })
84
85 return river.JobStuckHandlerResult{}
86 },
87 JobStuckThreshold: time.Millisecond,
88 JobTimeout: 10 * time.Millisecond,
89 Queues: map[string]river.QueueConfig{
90 river.QueueDefault: {MaxWorkers: 1},
91 },
92 Workers: workers,
93 }))
94 if err != nil {
95 panic(err)
96 }
97
98 if err := riverClient.Start(ctx); err != nil {
99 panic(err)
100 }

Callers

nothing calls this directly

Calls 13

TestDatabaseURLFunction · 0.92
NewWorkersFunction · 0.92
AddWorkerFunction · 0.92
NewClientFunction · 0.92
NewFunction · 0.92
WaitOrTimeoutNFunction · 0.92
PanicTBFunction · 0.92
InsertMethod · 0.80
initTestConfigFunction · 0.70
CloseMethod · 0.65
StopMethod · 0.65
StartMethod · 0.65

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…