Execute will start collection
()
| 152 | |
| 153 | // Execute will start collection |
| 154 | func (collector *ApiCollector) Execute() errors.Error { |
| 155 | logger := collector.args.Ctx.GetLogger() |
| 156 | logger.Info("start api collection") |
| 157 | |
| 158 | // make sure table is created |
| 159 | db := collector.args.Ctx.GetDal() |
| 160 | err := collector.ensureRawTable() |
| 161 | if err != nil { |
| 162 | return errors.Default.Wrap(err, "error auto-migrating collector") |
| 163 | } |
| 164 | |
| 165 | isIncremental := collector.args.Incremental |
| 166 | syncPolicy := collector.args.Ctx.TaskContext().SyncPolicy() |
| 167 | if syncPolicy != nil && syncPolicy.FullSync { |
| 168 | isIncremental = false |
| 169 | } |
| 170 | // flush data if not incremental collection |
| 171 | if !isIncremental { |
| 172 | err = db.Delete(&RawData{}, dal.From(collector.table), dal.Where("params = ?", collector.params)) |
| 173 | if err != nil { |
| 174 | return errors.Default.Wrap(err, "error deleting data from collector") |
| 175 | } |
| 176 | } |
| 177 | |
| 178 | // if MinTickInterval was specified |
| 179 | if collector.args.MinTickInterval != nil { |
| 180 | minTickInterval := *collector.args.MinTickInterval |
| 181 | if minTickInterval <= time.Duration(0) { |
| 182 | return errors.Default.Wrap(err, "MinTickInterval must be greater than 0") |
| 183 | } |
| 184 | oldTickInterval := collector.args.ApiClient.GetTickInterval() |
| 185 | if oldTickInterval < minTickInterval { |
| 186 | // reset the tick interval only if it exceeded the specified limit |
| 187 | logger.Info("set tick interval to %v", minTickInterval.String()) |
| 188 | collector.args.ApiClient.Reset(minTickInterval) |
| 189 | defer func() { |
| 190 | logger.Info("restore tick interval to %v", oldTickInterval.String()) |
| 191 | collector.args.ApiClient.Reset(oldTickInterval) |
| 192 | }() |
| 193 | } |
| 194 | } |
| 195 | |
| 196 | collector.args.Ctx.SetProgress(0, -1) |
| 197 | if collector.args.Input != nil { |
| 198 | iterator := collector.args.Input |
| 199 | defer iterator.Close() |
| 200 | apiClient := collector.args.ApiClient |
| 201 | if apiClient == nil { |
| 202 | return errors.Default.New("api_collector can not Execute with nil apiClient") |
| 203 | } |
| 204 | for { |
| 205 | if !iterator.HasNext() || apiClient.HasError() { |
| 206 | err = collector.args.ApiClient.WaitAsync() |
| 207 | if err != nil { |
| 208 | return err |
| 209 | } |
| 210 | if !iterator.HasNext() || apiClient.HasError() { |
| 211 | break |