SubscribeConfig uses all the same code as Subscribe, so these are just a minimal set of new tests to make sure that the function also works when used independently.
(t *testing.T)
| 7142 | // minimal set of new tests to make sure that the function also works when used |
| 7143 | // independently. |
| 7144 | func Test_Client_SubscribeConfig(t *testing.T) { |
| 7145 | t.Parallel() |
| 7146 | |
| 7147 | ctx := context.Background() |
| 7148 | |
| 7149 | t.Run("Success", func(t *testing.T) { |
| 7150 | t.Parallel() |
| 7151 | |
| 7152 | var ( |
| 7153 | dbPool = riversharedtest.DBPool(ctx, t) |
| 7154 | driver = riverpgxv5.New(dbPool) |
| 7155 | schema = riverdbtest.TestSchema(ctx, t, driver, nil) |
| 7156 | config = newTestConfig(t, schema) |
| 7157 | ) |
| 7158 | |
| 7159 | type JobArgs struct { |
| 7160 | testutil.JobArgsReflectKind[JobArgs] |
| 7161 | |
| 7162 | Name string `json:"name"` |
| 7163 | } |
| 7164 | |
| 7165 | // Fail/succeed jobs based on their name so we can get a mix of both to |
| 7166 | // verify. |
| 7167 | AddWorker(config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error { |
| 7168 | if strings.HasPrefix(job.Args.Name, "failed") { |
| 7169 | return errors.New("job error") |
| 7170 | } |
| 7171 | return nil |
| 7172 | })) |
| 7173 | |
| 7174 | client := newTestClient(t, dbPool, config) |
| 7175 | |
| 7176 | subscribeChan, cancel := client.SubscribeConfig(&SubscribeConfig{ |
| 7177 | Kinds: []EventKind{EventKindJobCompleted, EventKindJobFailed}, |
| 7178 | }) |
| 7179 | t.Cleanup(cancel) |
| 7180 | |
| 7181 | keyEventsByName := func(events []*Event) map[string]*Event { |
| 7182 | return sliceutil.KeyBy(events, func(event *Event) (string, *Event) { |
| 7183 | var args JobArgs |
| 7184 | require.NoError(t, json.Unmarshal(event.Job.EncodedArgs, &args)) |
| 7185 | return args.Name, event |
| 7186 | }) |
| 7187 | } |
| 7188 | |
| 7189 | requireInsert := func(ctx context.Context, client *Client[pgx.Tx], jobName string) *rivertype.JobRow { |
| 7190 | insertRes, err := client.Insert(ctx, JobArgs{Name: jobName}, nil) |
| 7191 | require.NoError(t, err) |
| 7192 | return insertRes.Job |
| 7193 | } |
| 7194 | |
| 7195 | jobCompleted1 := requireInsert(ctx, client, "completed1") |
| 7196 | jobCompleted2 := requireInsert(ctx, client, "completed2") |
| 7197 | jobFailed1 := requireInsert(ctx, client, "failed1") |
| 7198 | jobFailed2 := requireInsert(ctx, client, "failed2") |
| 7199 | |
| 7200 | expectedJobs := []*rivertype.JobRow{ |
| 7201 | jobCompleted1, |
nothing calls this directly
no test coverage detected
searching dependent graphs…