Example_jobStuckHandler demonstrates how to use JobStuckHandler to stop a client when too many jobs are stuck so the process can be restarted. For the first couple stuck jobs it uses AddWorkerSlot to add additional worker slots to replace those occupied by stuck jobs, but after maxStuckJobsBeforeRes
()
| 41 | // to replace those occupied by stuck jobs, but after maxStuckJobsBeforeRestart |
| 42 | // it gives up and exits so it can be restarted. |
| 43 | func Example_jobStuckHandler() { |
| 44 | ctx := context.Background() |
| 45 | |
| 46 | dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL()) |
| 47 | if err != nil { |
| 48 | panic(err) |
| 49 | } |
| 50 | defer dbPool.Close() |
| 51 | |
| 52 | var riverClient *river.Client[pgx.Tx] |
| 53 | |
| 54 | const maxStuckJobsBeforeRestart = 2 |
| 55 | |
| 56 | var ( |
| 57 | releaseJobs = make(chan struct{}) |
| 58 | restartRequested = make(chan struct{}) |
| 59 | started = make(chan struct{}, maxStuckJobsBeforeRestart+1) |
| 60 | stopOnce sync.Once |
| 61 | ) |
| 62 | |
| 63 | workers := river.NewWorkers() |
| 64 | river.AddWorker(workers, &StuckJobHandlerWorker{releaseJobs: releaseJobs, started: started}) |
| 65 | |
| 66 | riverClient, err = river.NewClient(riverpgxv5.New(dbPool), initTestConfig(ctx, dbPool, &river.Config{ |
| 67 | JobStuckHandler: func(ctx context.Context, params river.JobStuckHandlerParams) river.JobStuckHandlerResult { |
| 68 | if params.TotalStuckJobs <= maxStuckJobsBeforeRestart { |
| 69 | fmt.Printf("stuck jobs: %d; opening replacement worker slot\n", params.TotalStuckJobs) |
| 70 | return river.JobStuckHandlerResult{AddWorkerSlot: true} |
| 71 | } |
| 72 | |
| 73 | stopOnce.Do(func() { |
| 74 | fmt.Printf("too many stuck jobs: %d; shutting down so the process can restart\n", params.TotalStuckJobs) |
| 75 | close(restartRequested) |
| 76 | |
| 77 | shutdownCtx := context.WithoutCancel(ctx) |
| 78 | go func() { |
| 79 | if err := riverClient.Stop(shutdownCtx); err != nil { |
| 80 | panic(err) |
| 81 | } |
| 82 | }() |
| 83 | }) |
| 84 | |
| 85 | return river.JobStuckHandlerResult{} |
| 86 | }, |
| 87 | JobStuckThreshold: time.Millisecond, |
| 88 | JobTimeout: 10 * time.Millisecond, |
| 89 | Queues: map[string]river.QueueConfig{ |
| 90 | river.QueueDefault: {MaxWorkers: 1}, |
| 91 | }, |
| 92 | Workers: workers, |
| 93 | })) |
| 94 | if err != nil { |
| 95 | panic(err) |
| 96 | } |
| 97 | |
| 98 | if err := riverClient.Start(ctx); err != nil { |
| 99 | panic(err) |
| 100 | } |
nothing calls this directly
no test coverage detected
searching dependent graphs…