MCPcopy Index your code
hub / github.com/riverqueue/river / Test_Client_Stop_Common

Function Test_Client_Stop_Common

client_test.go:2114–2341  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

2112}
2113
2114func Test_Client_Stop_Common(t *testing.T) {
2115 t.Parallel()
2116
2117 ctx := context.Background()
2118
2119 // Performs continual job insertion on a number of background goroutines.
2120 // Returns a `finish` function that should be deferred to stop insertion and
2121 // safely stop goroutines.
2122 doParallelContinualInsertion := func(ctx context.Context, t *testing.T, client *Client[pgx.Tx], args JobArgs) func() {
2123 t.Helper()
2124
2125 ctx, cancel := context.WithCancel(ctx)
2126
2127 var wg sync.WaitGroup
2128 for range 10 {
2129 wg.Go(func() {
2130 for {
2131 select {
2132 case <-ctx.Done():
2133 return
2134 default:
2135 }
2136
2137 _, err := client.Insert(ctx, args, nil)
2138 // A cancelled context may produce a variety of underlying
2139 // errors in pgx, so rather than comparing the return error,
2140 // first check if context is cancelled, and ignore an error
2141 // return if it is.
2142 if ctx.Err() != nil {
2143 return
2144 }
2145 require.NoError(t, err)
2146
2147 // Sleep a brief time between inserts.
2148 serviceutil.CancellableSleep(ctx, randutil.DurationBetween(1*time.Microsecond, 10*time.Millisecond))
2149 }
2150 })
2151 }
2152
2153 return func() {
2154 cancel()
2155 wg.Wait()
2156 }
2157 }
2158
2159 t.Run("NoJobsInProgress", func(t *testing.T) {
2160 t.Parallel()
2161
2162 client := runNewTestClient(ctx, t, newTestConfig(t, ""))
2163
2164 // Shutdown should still complete promptly, but the client may need a full
2165 // leadership resign attempt before stopping, which can take about a second
2166 // under race instrumentation.
2167 ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
2168 defer cancel()
2169
2170 require.NoError(t, client.Stop(ctx))
2171 })

Callers

nothing calls this directly

Calls 15

CancellableSleepFunction · 0.92
DurationBetweenFunction · 0.92
DBPoolFunction · 0.92
NewFunction · 0.92
TestSchemaFunction · 0.92
WaitOrTimeoutFunction · 0.92
runNewTestClientFunction · 0.85
newTestClientFunction · 0.85
AddWorkerFunction · 0.85
WorkFuncFunction · 0.85
NewJobListParamsFunction · 0.85
DoneMethod · 0.80

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…