(t *testing.T)
| 6877 | } |
| 6878 | |
| 6879 | func Test_Client_Subscribe(t *testing.T) { |
| 6880 | t.Parallel() |
| 6881 | |
| 6882 | ctx := context.Background() |
| 6883 | |
| 6884 | type JobArgs struct { |
| 6885 | testutil.JobArgsReflectKind[JobArgs] |
| 6886 | |
| 6887 | Name string `json:"name"` |
| 6888 | } |
| 6889 | |
| 6890 | keyEventsByName := func(events []*Event) map[string]*Event { |
| 6891 | return sliceutil.KeyBy(events, func(event *Event) (string, *Event) { |
| 6892 | var args JobArgs |
| 6893 | require.NoError(t, json.Unmarshal(event.Job.EncodedArgs, &args)) |
| 6894 | return args.Name, event |
| 6895 | }) |
| 6896 | } |
| 6897 | |
| 6898 | requireInsert := func(ctx context.Context, client *Client[pgx.Tx], jobName string) *rivertype.JobRow { |
| 6899 | insertRes, err := client.Insert(ctx, JobArgs{Name: jobName}, nil) |
| 6900 | require.NoError(t, err) |
| 6901 | return insertRes.Job |
| 6902 | } |
| 6903 | |
| 6904 | t.Run("Success", func(t *testing.T) { |
| 6905 | t.Parallel() |
| 6906 | |
| 6907 | var ( |
| 6908 | dbPool = riversharedtest.DBPool(ctx, t) |
| 6909 | driver = riverpgxv5.New(dbPool) |
| 6910 | schema = riverdbtest.TestSchema(ctx, t, driver, nil) |
| 6911 | config = newTestConfig(t, schema) |
| 6912 | ) |
| 6913 | |
| 6914 | // Fail/succeed jobs based on their name so we can get a mix of both to |
| 6915 | // verify. |
| 6916 | AddWorker(config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error { |
| 6917 | if strings.HasPrefix(job.Args.Name, "failed") { |
| 6918 | return errors.New("job error") |
| 6919 | } |
| 6920 | return nil |
| 6921 | })) |
| 6922 | |
| 6923 | client := newTestClient(t, dbPool, config) |
| 6924 | |
| 6925 | subscribeChan, cancel := client.Subscribe(EventKindJobCompleted, EventKindJobFailed) |
| 6926 | t.Cleanup(cancel) |
| 6927 | |
| 6928 | jobCompleted1 := requireInsert(ctx, client, "completed1") |
| 6929 | jobCompleted2 := requireInsert(ctx, client, "completed2") |
| 6930 | jobFailed1 := requireInsert(ctx, client, "failed1") |
| 6931 | jobFailed2 := requireInsert(ctx, client, "failed2") |
| 6932 | |
| 6933 | expectedJobs := []*rivertype.JobRow{ |
| 6934 | jobCompleted1, |
| 6935 | jobCompleted2, |
| 6936 | jobFailed1, |
nothing calls this directly
no test coverage detected
searching dependent graphs…