NewClient creates a new Client with the given database driver and configuration. Currently only one driver is supported, which is Pgx v5. See package riverpgxv5. The function takes a generic parameter TTx representing a transaction type, but it can be omitted because it'll generally always be infe
(driver riverdriver.Driver[TTx], config *Config)
| 779 | // // handle error |
| 780 | // } |
| 781 | func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client[TTx], error) { |
| 782 | if driver == nil { |
| 783 | return nil, errMissingDriver |
| 784 | } |
| 785 | if config == nil { |
| 786 | return nil, errMissingConfig |
| 787 | } |
| 788 | |
| 789 | config = config.WithDefaults() |
| 790 | |
| 791 | if err := config.validate(); err != nil { |
| 792 | return nil, err |
| 793 | } |
| 794 | |
| 795 | archetype := baseservice.NewArchetype(config.Logger) |
| 796 | if config.Test.Time != nil { |
| 797 | if withStub, ok := config.Test.Time.(baseservice.TimeGeneratorWithStub); ok { |
| 798 | archetype.Time = withStub |
| 799 | } else { |
| 800 | archetype.Time = &baseservice.TimeGeneratorWithStubWrapper{TimeGenerator: config.Test.Time} |
| 801 | } |
| 802 | } |
| 803 | |
| 804 | for _, hook := range config.Hooks { |
| 805 | if withBaseService, ok := hook.(baseservice.WithBaseService); ok { |
| 806 | baseservice.Init(archetype, withBaseService) |
| 807 | } |
| 808 | } |
| 809 | |
| 810 | client := &Client[TTx]{ |
| 811 | clientNotifyBundle: &ClientNotifyBundle[TTx]{ |
| 812 | config: config, |
| 813 | driver: driver, |
| 814 | }, |
| 815 | config: config, |
| 816 | driver: driver, |
| 817 | hookLookupByJob: hooklookup.NewJobHookLookup(), |
| 818 | hookLookupGlobal: hooklookup.NewHookLookup(config.Hooks), |
| 819 | producersByQueueName: make(map[string]*producer), |
| 820 | testSignals: clientTestSignals{}, |
| 821 | workCancel: func(cause error) {}, // replaced on start, but here in case StopAndCancel is called before start up |
| 822 | } |
| 823 | |
| 824 | client.queues = &QueueBundle{ |
| 825 | clientFetchCooldown: config.FetchCooldown, |
| 826 | clientFetchPollInterval: config.FetchPollInterval, |
| 827 | clientWillExecuteJobs: config.willExecuteJobs(), |
| 828 | producerAdd: client.producerAdd, |
| 829 | producerRemove: client.producerRemove, |
| 830 | } |
| 831 | |
| 832 | baseservice.Init(archetype, &client.baseService) |
| 833 | client.baseService.Name = "Client" // Have to correct the name because base service isn't embedded like it usually is |
| 834 | client.insertNotifyLimiter = notifylimiter.NewLimiter(archetype, config.FetchCooldown) |
| 835 | |
| 836 | // Validation ensures that config.JobInsertMiddleware/WorkerMiddleware or |
| 837 | // the more abstract config.Middleware for middleware are set, but not both, |
| 838 | // so in practice we never append all three of these to each other. |
searching dependent graphs…