Execute function implements Subtask interface. It loads data from Tool Layer Tables using `Ctx.GetDal()`, convert Data using `converter.args.Convert` handler Then save data to Domain Layer Tables using BatchSaveDivider
()
| 76 | // It loads data from Tool Layer Tables using `Ctx.GetDal()`, convert Data using `converter.args.Convert` handler |
| 77 | // Then save data to Domain Layer Tables using BatchSaveDivider |
| 78 | func (converter *DataConverter) Execute() errors.Error { |
| 79 | // load data from database |
| 80 | db := converter.args.Ctx.GetDal() |
| 81 | |
| 82 | // batch save divider |
| 83 | RAW_DATA_ORIGIN := "RawDataOrigin" |
| 84 | divider := NewBatchSaveDivider(converter.args.Ctx, converter.args.BatchSize, converter.table, converter.params) |
| 85 | |
| 86 | // set progress |
| 87 | converter.args.Ctx.SetProgress(0, -1) |
| 88 | |
| 89 | cursor := converter.args.Input |
| 90 | defer cursor.Close() |
| 91 | ctx := converter.args.Ctx.GetContext() |
| 92 | // iterate all rows |
| 93 | for cursor.Next() { |
| 94 | select { |
| 95 | case <-ctx.Done(): |
| 96 | return errors.Convert(ctx.Err()) |
| 97 | default: |
| 98 | } |
| 99 | inputRow := reflect.New(converter.args.InputRowType).Interface() |
| 100 | err := db.Fetch(cursor, inputRow) |
| 101 | if err != nil { |
| 102 | return errors.Default.Wrap(err, "error fetching rows") |
| 103 | } |
| 104 | |
| 105 | results, err := converter.args.Convert(inputRow) |
| 106 | if err != nil { |
| 107 | return errors.Default.Wrap(err, "error calling Converter plugin implementation") |
| 108 | } |
| 109 | |
| 110 | for _, result := range results { |
| 111 | // get the batch operator for the specific type |
| 112 | batch, err := divider.ForType(reflect.TypeOf(result)) |
| 113 | if err != nil { |
| 114 | return errors.Default.Wrap(err, "error getting batch from result") |
| 115 | } |
| 116 | // set raw data origin field |
| 117 | origin := reflect.ValueOf(result).Elem().FieldByName(RAW_DATA_ORIGIN) |
| 118 | if origin.IsValid() { |
| 119 | origin.Set(reflect.ValueOf(inputRow).Elem().FieldByName(RAW_DATA_ORIGIN)) |
| 120 | } |
| 121 | // records get saved into db when slots were max outed |
| 122 | err = batch.Add(result) |
| 123 | if err != nil { |
| 124 | return errors.Default.Wrap(err, "error adding result to batch") |
| 125 | } |
| 126 | } |
| 127 | converter.args.Ctx.IncProgress(1) |
| 128 | } |
| 129 | |
| 130 | // save the last batches |
| 131 | return divider.Close() |
| 132 | } |
| 133 | |
| 134 | // Check if DataConverter implements SubTask interface |
| 135 | var _ plugin.SubTask = (*DataConverter)(nil) |
no test coverage detected