MCPcopy Index your code
hub / github.com/riverqueue/river / NewClient

Function NewClient

client.go:781–1068  ·  view source on GitHub ↗

NewClient creates a new Client with the given database driver and configuration. Currently only one driver is supported, which is Pgx v5. See package riverpgxv5. The function takes a generic parameter TTx representing a transaction type, but it can be omitted because it'll generally always be infe

(driver riverdriver.Driver[TTx], config *Config)

Source from the content-addressed store, hash-verified

779// // handle error
780// }
781func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client[TTx], error) {
782 if driver == nil {
783 return nil, errMissingDriver
784 }
785 if config == nil {
786 return nil, errMissingConfig
787 }
788
789 config = config.WithDefaults()
790
791 if err := config.validate(); err != nil {
792 return nil, err
793 }
794
795 archetype := baseservice.NewArchetype(config.Logger)
796 if config.Test.Time != nil {
797 if withStub, ok := config.Test.Time.(baseservice.TimeGeneratorWithStub); ok {
798 archetype.Time = withStub
799 } else {
800 archetype.Time = &baseservice.TimeGeneratorWithStubWrapper{TimeGenerator: config.Test.Time}
801 }
802 }
803
804 for _, hook := range config.Hooks {
805 if withBaseService, ok := hook.(baseservice.WithBaseService); ok {
806 baseservice.Init(archetype, withBaseService)
807 }
808 }
809
810 client := &Client[TTx]{
811 clientNotifyBundle: &ClientNotifyBundle[TTx]{
812 config: config,
813 driver: driver,
814 },
815 config: config,
816 driver: driver,
817 hookLookupByJob: hooklookup.NewJobHookLookup(),
818 hookLookupGlobal: hooklookup.NewHookLookup(config.Hooks),
819 producersByQueueName: make(map[string]*producer),
820 testSignals: clientTestSignals{},
821 workCancel: func(cause error) {}, // replaced on start, but here in case StopAndCancel is called before start up
822 }
823
824 client.queues = &QueueBundle{
825 clientFetchCooldown: config.FetchCooldown,
826 clientFetchPollInterval: config.FetchPollInterval,
827 clientWillExecuteJobs: config.willExecuteJobs(),
828 producerAdd: client.producerAdd,
829 producerRemove: client.producerRemove,
830 }
831
832 baseservice.Init(archetype, &client.baseService)
833 client.baseService.Name = "Client" // Have to correct the name because base service isn't embedded like it usually is
834 client.insertNotifyLimiter = notifylimiter.NewLimiter(archetype, config.FetchCooldown)
835
836 // Validation ensures that config.JobInsertMiddleware/WorkerMiddleware or
837 // the more abstract config.Middleware for middleware are set, but not both,
838 // so in practice we never append all three of these to each other.

Callers 15

Example_uniqueJobFunction · 0.92
Example_jobArgsHooksFunction · 0.92
Example_jobSnoozeFunction · 0.92
Example_subscriptionFunction · 0.92
Example_insertAndWorkFunction · 0.92
Example_periodicJobFunction · 0.92
Example_cronJobFunction · 0.92
Example_batchInsertFunction · 0.92
Example_resumableCursorFunction · 0.92

Calls 15

producerAddMethod · 0.95
NewArchetypeFunction · 0.92
InitFunction · 0.92
NewJobHookLookupFunction · 0.92
NewHookLookupFunction · 0.92
NewLimiterFunction · 0.92
DefaultMiddlewareFunction · 0.92
NewMiddlewareLookupFunction · 0.92
NewBatchCompleterFunction · 0.92
NewFunction · 0.92
NewElectorFunction · 0.92
StartStopFuncFunction · 0.92

Tested by 15

Example_uniqueJobFunction · 0.74
Example_jobArgsHooksFunction · 0.74
Example_jobSnoozeFunction · 0.74
Example_subscriptionFunction · 0.74
Example_insertAndWorkFunction · 0.74
Example_periodicJobFunction · 0.74
Example_cronJobFunction · 0.74
Example_batchInsertFunction · 0.74
Example_resumableCursorFunction · 0.74

Used in the wild real call sites across dependent graphs

searching dependent graphs…