try waits for the readinessTracker to be ready and then attempts to execute the passed callback. If the readinessTracker does not reach a ready state, it will timeout.
(ctx context.Context, fn func() error, tracker *readinessTracker)
| 45 | // try waits for the readinessTracker to be ready and then attempts to execute the passed callback. |
| 46 | // If the readinessTracker does not reach a ready state, it will timeout. |
| 47 | func (a *apiActivator) try(ctx context.Context, fn func() error, tracker *readinessTracker) error { |
| 48 | var execErr error |
| 49 | |
| 50 | if err := a.breaker.Maybe(ctx, func() { |
| 51 | ctx, cancel := context.WithTimeout(ctx, consts.WaitForReadyReplicasTimeout) |
| 52 | defer cancel() |
| 53 | |
| 54 | if !tracker.IsReady() { |
| 55 | loop: |
| 56 | for { |
| 57 | select { |
| 58 | case <-ctx.Done(): |
| 59 | execErr = errors.Wrap(ctx.Err(), "no ready replicas available") |
| 60 | return |
| 61 | case <-tracker.Wait(): |
| 62 | break loop |
| 63 | } |
| 64 | } |
| 65 | } |
| 66 | execErr = fn() |
| 67 | }); err != nil { |
| 68 | return err |
| 69 | } |
| 70 | |
| 71 | return execErr |
| 72 | } |
| 73 | |
| 74 | // updateQueueParams updates the breaker queue parameters (not thread safe) |
| 75 | func (a *apiActivator) updateQueueParams(maxQueueLength, maxConcurrency int) { |