Add adds a new queue to the client. If the client is already started, a producer for the queue is started. Context is inherited from the one given to Client.Start.
(queueName string, queueConfig QueueConfig)
| 2938 | // producer for the queue is started. Context is inherited from the one given to |
| 2939 | // Client.Start. |
| 2940 | func (b *QueueBundle) Add(queueName string, queueConfig QueueConfig) error { |
| 2941 | if !b.clientWillExecuteJobs { |
| 2942 | return errors.New("client is not configured to execute jobs, cannot add queue") |
| 2943 | } |
| 2944 | |
| 2945 | if err := queueConfig.validate(queueName, b.clientFetchCooldown, b.clientFetchPollInterval); err != nil { |
| 2946 | return err |
| 2947 | } |
| 2948 | |
| 2949 | b.startStopMu.Lock() |
| 2950 | defer b.startStopMu.Unlock() |
| 2951 | |
| 2952 | producer, err := b.producerAdd(queueName, queueConfig) |
| 2953 | if err != nil { |
| 2954 | return err |
| 2955 | } |
| 2956 | |
| 2957 | // Start the queue if the client is already started. |
| 2958 | if b.fetchCtx != nil && b.fetchCtx.Err() == nil { |
| 2959 | if err := producer.StartWorkContext(b.fetchCtx, b.workCtx); err != nil { |
| 2960 | return err |
| 2961 | } |
| 2962 | } |
| 2963 | |
| 2964 | return nil |
| 2965 | } |
| 2966 | |
| 2967 | // Remove removes a queue from the client, stopping the producer if the client |
| 2968 | // is running. It waits for any jobs currently being worked in the queue to |