Execute sub-task
()
| 71 | |
| 72 | // Execute sub-task |
| 73 | func (extractor *ApiExtractor) Execute() errors.Error { |
| 74 | // load data from database |
| 75 | db := extractor.args.Ctx.GetDal() |
| 76 | logger := extractor.args.Ctx.GetLogger() |
| 77 | if !db.HasTable(extractor.table) { |
| 78 | return nil |
| 79 | } |
| 80 | clauses := []dal.Clause{ |
| 81 | dal.From(extractor.table), |
| 82 | dal.Where("params = ?", extractor.params), |
| 83 | dal.Orderby("id ASC"), |
| 84 | } |
| 85 | |
| 86 | count, err := db.Count(clauses...) |
| 87 | if err != nil { |
| 88 | return errors.Default.Wrap(err, "error getting count of clauses") |
| 89 | } |
| 90 | cursor, err := db.Cursor(clauses...) |
| 91 | if err != nil { |
| 92 | return errors.Default.Wrap(err, "error running DB query") |
| 93 | } |
| 94 | logger.Info("get data from %s where params=%s and got %d", extractor.table, extractor.params, count) |
| 95 | defer cursor.Close() |
| 96 | // batch save divider |
| 97 | divider := NewBatchSaveDivider(extractor.args.Ctx, extractor.args.BatchSize, extractor.table, extractor.params) |
| 98 | |
| 99 | // progress |
| 100 | extractor.args.Ctx.SetProgress(0, -1) |
| 101 | ctx := extractor.args.Ctx.GetContext() |
| 102 | // iterate all rows |
| 103 | for cursor.Next() { |
| 104 | select { |
| 105 | case <-ctx.Done(): |
| 106 | return errors.Convert(ctx.Err()) |
| 107 | default: |
| 108 | } |
| 109 | row := &RawData{} |
| 110 | err = db.Fetch(cursor, row) |
| 111 | if err != nil { |
| 112 | return errors.Default.Wrap(err, "error fetching row") |
| 113 | } |
| 114 | |
| 115 | results, err := extractor.args.Extract(row) |
| 116 | if err != nil { |
| 117 | return errors.Default.Wrap(err, "error calling plugin Extract implementation") |
| 118 | } |
| 119 | for _, result := range results { |
| 120 | // get the batch operator for the specific type |
| 121 | batch, err := divider.ForType(reflect.TypeOf(result)) |
| 122 | if err != nil { |
| 123 | return errors.Default.Wrap(err, "error getting batch from result") |
| 124 | } |
| 125 | // set raw data origin field |
| 126 | setRawDataOrigin(result, common.RawDataOrigin{ |
| 127 | RawDataTable: extractor.table, |
| 128 | RawDataId: row.ID, |
| 129 | RawDataParams: row.Params, |
| 130 | }) |
no test coverage detected