| 111 | } |
| 112 | |
| 113 | func (e *engineImpl) Start() { |
| 114 | e.tomb.Go(e.reporting) |
| 115 | if os.Getenv(context.KeySvcName) == specv1.BaetylCore { |
| 116 | e.tomb.Go(e.cleaning) |
| 117 | } |
| 118 | ch, err := e.pb.Subscribe(sync.TopicDownside) |
| 119 | if err != nil { |
| 120 | e.log.Error("failed to subscribe downside topic", log.Any("topic", sync.TopicDownside), log.Error(err)) |
| 121 | } |
| 122 | e.downsideChan = ch |
| 123 | e.downsideProcess = pubsub.NewProcessor(e.downsideChan, 0, &handlerDownside{e}) |
| 124 | e.downsideProcess.Start() |
| 125 | } |
| 126 | |
| 127 | func (e *engineImpl) ReportAndDesire() error { |
| 128 | return errors.Trace(e.reportAndDesireAsync(false)) |