MCPcopy Index your code
hub / github.com/riverqueue/river / Test_Client_Maintenance

Function Test_Client_Maintenance

client_test.go:5570–6296  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

5568}
5569
5570func Test_Client_Maintenance(t *testing.T) {
5571 t.Parallel()
5572
5573 ctx := context.Background()
5574
5575 type testBundle struct {
5576 exec riverdriver.Executor
5577 schema string
5578 }
5579
5580 setup := func(t *testing.T, config *Config) (*Client[pgx.Tx], *testBundle) {
5581 t.Helper()
5582
5583 var (
5584 dbPool = riversharedtest.DBPool(ctx, t)
5585 driver = riverpgxv5.New(dbPool)
5586 schema = riverdbtest.TestSchema(ctx, t, driver, nil)
5587 )
5588 config.Schema = schema
5589
5590 client := newTestClient(t, dbPool, config)
5591 client.testSignals.Init(t)
5592
5593 return client, &testBundle{
5594 exec: client.driver.GetExecutor(),
5595 schema: schema,
5596 }
5597 }
5598
5599 // Starts the client, then waits for it to be elected leader and for the
5600 // queue maintainer to start.
5601 startAndWaitForQueueMaintainer := func(ctx context.Context, t *testing.T, client *Client[pgx.Tx]) {
5602 t.Helper()
5603
5604 startClient(ctx, t, client)
5605 client.queueMaintainerLeader.TestSignals.ElectedLeader.WaitOrTimeout()
5606 riversharedtest.WaitOrTimeout(t, client.queueMaintainer.Started())
5607 }
5608
5609 t.Run("JobCleanerCleans", func(t *testing.T) {
5610 t.Parallel()
5611
5612 config := newTestConfig(t, "")
5613 config.CancelledJobRetentionPeriod = 1 * time.Hour
5614 config.CompletedJobRetentionPeriod = 1 * time.Hour
5615 config.DiscardedJobRetentionPeriod = 1 * time.Hour
5616
5617 client, bundle := setup(t, config)
5618
5619 deleteHorizon := time.Now().Add(-config.CompletedJobRetentionPeriod)
5620
5621 // Take care to insert jobs before starting the client because otherwise
5622 // there's a race condition where the cleaner could run its initial
5623 // pass before our insertion is complete.
5624 ineligibleJob1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Schema: bundle.schema, State: ptrutil.Ptr(rivertype.JobStateAvailable)})
5625 ineligibleJob2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Schema: bundle.schema, State: ptrutil.Ptr(rivertype.JobStateRunning)})
5626 ineligibleJob3 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Schema: bundle.schema, State: ptrutil.Ptr(rivertype.JobStateScheduled)})
5627

Callers

nothing calls this directly

Calls 15

DBPoolFunction · 0.92
NewFunction · 0.92
TestSchemaFunction · 0.92
WaitOrTimeoutFunction · 0.92
JobFunction · 0.92
PtrFunction · 0.92
QueueFunction · 0.92
newTestClientFunction · 0.85
NewPeriodicJobFunction · 0.85
AddWorkerFunction · 0.85
JobGetMethod · 0.80
PeriodicJobsMethod · 0.80

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…