MCPcopy Index your code
hub / github.com/riverqueue/river / Test_Client_Subscribe

Function Test_Client_Subscribe

client_test.go:6879–7139  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

6877}
6878
6879func Test_Client_Subscribe(t *testing.T) {
6880 t.Parallel()
6881
6882 ctx := context.Background()
6883
6884 type JobArgs struct {
6885 testutil.JobArgsReflectKind[JobArgs]
6886
6887 Name string `json:"name"`
6888 }
6889
6890 keyEventsByName := func(events []*Event) map[string]*Event {
6891 return sliceutil.KeyBy(events, func(event *Event) (string, *Event) {
6892 var args JobArgs
6893 require.NoError(t, json.Unmarshal(event.Job.EncodedArgs, &args))
6894 return args.Name, event
6895 })
6896 }
6897
6898 requireInsert := func(ctx context.Context, client *Client[pgx.Tx], jobName string) *rivertype.JobRow {
6899 insertRes, err := client.Insert(ctx, JobArgs{Name: jobName}, nil)
6900 require.NoError(t, err)
6901 return insertRes.Job
6902 }
6903
6904 t.Run("Success", func(t *testing.T) {
6905 t.Parallel()
6906
6907 var (
6908 dbPool = riversharedtest.DBPool(ctx, t)
6909 driver = riverpgxv5.New(dbPool)
6910 schema = riverdbtest.TestSchema(ctx, t, driver, nil)
6911 config = newTestConfig(t, schema)
6912 )
6913
6914 // Fail/succeed jobs based on their name so we can get a mix of both to
6915 // verify.
6916 AddWorker(config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
6917 if strings.HasPrefix(job.Args.Name, "failed") {
6918 return errors.New("job error")
6919 }
6920 return nil
6921 }))
6922
6923 client := newTestClient(t, dbPool, config)
6924
6925 subscribeChan, cancel := client.Subscribe(EventKindJobCompleted, EventKindJobFailed)
6926 t.Cleanup(cancel)
6927
6928 jobCompleted1 := requireInsert(ctx, client, "completed1")
6929 jobCompleted2 := requireInsert(ctx, client, "completed2")
6930 jobFailed1 := requireInsert(ctx, client, "failed1")
6931 jobFailed2 := requireInsert(ctx, client, "failed2")
6932
6933 expectedJobs := []*rivertype.JobRow{
6934 jobCompleted1,
6935 jobCompleted2,
6936 jobFailed1,

Callers

nothing calls this directly

Calls 15

KeyByFunction · 0.92
DBPoolFunction · 0.92
NewFunction · 0.92
TestSchemaFunction · 0.92
WaitOrTimeoutFunction · 0.92
AddWorkerFunction · 0.85
WorkFuncFunction · 0.85
newTestClientFunction · 0.85
EventKindTypeAlias · 0.85
InsertMethod · 0.80
SubscribeMethod · 0.80
CleanupMethod · 0.80

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…