MCPcopy Index your code
hub / github.com/riverqueue/river / Test_Client_SubscribeConfig

Function Test_Client_SubscribeConfig

client_test.go:7144–7359  ·  view source on GitHub ↗

SubscribeConfig uses all the same code as Subscribe, so these are just a minimal set of new tests to make sure that the function also works when used independently.

(t *testing.T)

Source from the content-addressed store, hash-verified

7142// minimal set of new tests to make sure that the function also works when used
7143// independently.
7144func Test_Client_SubscribeConfig(t *testing.T) {
7145 t.Parallel()
7146
7147 ctx := context.Background()
7148
7149 t.Run("Success", func(t *testing.T) {
7150 t.Parallel()
7151
7152 var (
7153 dbPool = riversharedtest.DBPool(ctx, t)
7154 driver = riverpgxv5.New(dbPool)
7155 schema = riverdbtest.TestSchema(ctx, t, driver, nil)
7156 config = newTestConfig(t, schema)
7157 )
7158
7159 type JobArgs struct {
7160 testutil.JobArgsReflectKind[JobArgs]
7161
7162 Name string `json:"name"`
7163 }
7164
7165 // Fail/succeed jobs based on their name so we can get a mix of both to
7166 // verify.
7167 AddWorker(config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
7168 if strings.HasPrefix(job.Args.Name, "failed") {
7169 return errors.New("job error")
7170 }
7171 return nil
7172 }))
7173
7174 client := newTestClient(t, dbPool, config)
7175
7176 subscribeChan, cancel := client.SubscribeConfig(&SubscribeConfig{
7177 Kinds: []EventKind{EventKindJobCompleted, EventKindJobFailed},
7178 })
7179 t.Cleanup(cancel)
7180
7181 keyEventsByName := func(events []*Event) map[string]*Event {
7182 return sliceutil.KeyBy(events, func(event *Event) (string, *Event) {
7183 var args JobArgs
7184 require.NoError(t, json.Unmarshal(event.Job.EncodedArgs, &args))
7185 return args.Name, event
7186 })
7187 }
7188
7189 requireInsert := func(ctx context.Context, client *Client[pgx.Tx], jobName string) *rivertype.JobRow {
7190 insertRes, err := client.Insert(ctx, JobArgs{Name: jobName}, nil)
7191 require.NoError(t, err)
7192 return insertRes.Job
7193 }
7194
7195 jobCompleted1 := requireInsert(ctx, client, "completed1")
7196 jobCompleted2 := requireInsert(ctx, client, "completed2")
7197 jobFailed1 := requireInsert(ctx, client, "failed1")
7198 jobFailed2 := requireInsert(ctx, client, "failed2")
7199
7200 expectedJobs := []*rivertype.JobRow{
7201 jobCompleted1,

Callers

nothing calls this directly

Calls 15

DBPoolFunction · 0.92
NewFunction · 0.92
TestSchemaFunction · 0.92
KeyByFunction · 0.92
WaitOrTimeoutFunction · 0.92
WaitOrTimeoutNFunction · 0.92
AddWorkerFunction · 0.85
WorkFuncFunction · 0.85
newTestClientFunction · 0.85
EventKindTypeAlias · 0.85
CleanupMethod · 0.80
InsertMethod · 0.80

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…