(t *testing.T)
| 5568 | } |
| 5569 | |
| 5570 | func Test_Client_Maintenance(t *testing.T) { |
| 5571 | t.Parallel() |
| 5572 | |
| 5573 | ctx := context.Background() |
| 5574 | |
| 5575 | type testBundle struct { |
| 5576 | exec riverdriver.Executor |
| 5577 | schema string |
| 5578 | } |
| 5579 | |
| 5580 | setup := func(t *testing.T, config *Config) (*Client[pgx.Tx], *testBundle) { |
| 5581 | t.Helper() |
| 5582 | |
| 5583 | var ( |
| 5584 | dbPool = riversharedtest.DBPool(ctx, t) |
| 5585 | driver = riverpgxv5.New(dbPool) |
| 5586 | schema = riverdbtest.TestSchema(ctx, t, driver, nil) |
| 5587 | ) |
| 5588 | config.Schema = schema |
| 5589 | |
| 5590 | client := newTestClient(t, dbPool, config) |
| 5591 | client.testSignals.Init(t) |
| 5592 | |
| 5593 | return client, &testBundle{ |
| 5594 | exec: client.driver.GetExecutor(), |
| 5595 | schema: schema, |
| 5596 | } |
| 5597 | } |
| 5598 | |
| 5599 | // Starts the client, then waits for it to be elected leader and for the |
| 5600 | // queue maintainer to start. |
| 5601 | startAndWaitForQueueMaintainer := func(ctx context.Context, t *testing.T, client *Client[pgx.Tx]) { |
| 5602 | t.Helper() |
| 5603 | |
| 5604 | startClient(ctx, t, client) |
| 5605 | client.queueMaintainerLeader.TestSignals.ElectedLeader.WaitOrTimeout() |
| 5606 | riversharedtest.WaitOrTimeout(t, client.queueMaintainer.Started()) |
| 5607 | } |
| 5608 | |
| 5609 | t.Run("JobCleanerCleans", func(t *testing.T) { |
| 5610 | t.Parallel() |
| 5611 | |
| 5612 | config := newTestConfig(t, "") |
| 5613 | config.CancelledJobRetentionPeriod = 1 * time.Hour |
| 5614 | config.CompletedJobRetentionPeriod = 1 * time.Hour |
| 5615 | config.DiscardedJobRetentionPeriod = 1 * time.Hour |
| 5616 | |
| 5617 | client, bundle := setup(t, config) |
| 5618 | |
| 5619 | deleteHorizon := time.Now().Add(-config.CompletedJobRetentionPeriod) |
| 5620 | |
| 5621 | // Take care to insert jobs before starting the client because otherwise |
| 5622 | // there's a race condition where the cleaner could run its initial |
| 5623 | // pass before our insertion is complete. |
| 5624 | ineligibleJob1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Schema: bundle.schema, State: ptrutil.Ptr(rivertype.JobStateAvailable)}) |
| 5625 | ineligibleJob2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Schema: bundle.schema, State: ptrutil.Ptr(rivertype.JobStateRunning)}) |
| 5626 | ineligibleJob3 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Schema: bundle.schema, State: ptrutil.Ptr(rivertype.JobStateScheduled)}) |
| 5627 |
nothing calls this directly
no test coverage detected
searching dependent graphs…