subscribeEvents will loop until it can successfully call d.Client.Events() without immediately receiving an error. It applies exponential backoff on failures. Returns the new (eventsChan, errChan) pair or an error if context/tomb is done.
(ctx context.Context)
| 379 | // without immediately receiving an error. It applies exponential backoff on failures. |
| 380 | // Returns the new (eventsChan, errChan) pair or an error if context/tomb is done. |
| 381 | func (d *Source) subscribeEvents(ctx context.Context) (*subscription, error) { |
| 382 | select { |
| 383 | case <-ctx.Done(): |
| 384 | return nil, ctx.Err() |
| 385 | case <-d.t.Dying(): |
| 386 | return nil, errors.New("connection aborted, shutting down docker watcher") |
| 387 | default: |
| 388 | } |
| 389 | |
| 390 | d.logger.Infof("Subscribing to Docker events") |
| 391 | |
| 392 | operation := func() (*subscription, error) { |
| 393 | select { |
| 394 | case <-ctx.Done(): |
| 395 | return nil, backoff.Permanent(ctx.Err()) |
| 396 | case <-d.t.Dying(): |
| 397 | return nil, backoff.Permanent(errors.New("connection aborted, shutting down docker watcher")) |
| 398 | default: |
| 399 | } |
| 400 | |
| 401 | return d.trySubscribeEvents(ctx) |
| 402 | } |
| 403 | |
| 404 | notify := func(err error, wait time.Duration) { |
| 405 | d.logger.Warnf("failed to subscribe to Docker events: %v; retrying in %s", err, wait) |
| 406 | } |
| 407 | |
| 408 | bo := d.backoffFactory() |
| 409 | |
| 410 | sub, err := backoff.Retry(ctx, operation, backoff.WithBackOff(bo), backoff.WithNotify(notify)) |
| 411 | if err != nil { |
| 412 | return nil, err |
| 413 | } |
| 414 | |
| 415 | d.logger.Info("successfully subscribed to Docker events") |
| 416 | |
| 417 | return sub, nil |
| 418 | } |
| 419 | |
| 420 | func (d *Source) Watch(ctx context.Context, containerChan chan *ContainerConfig, containerDeleteChan chan *ContainerConfig, serviceChan chan *ContainerConfig, serviceDeleteChan chan *ContainerConfig) error { |
| 421 | err := d.checkContainers(ctx, containerChan, containerDeleteChan) |
no test coverage detected