MCPcopy
hub / github.com/apache/devlake / Execute

Method Execute

backend/helpers/pluginhelper/api/data_convertor.go:78–132  ·  view source on GitHub ↗

Execute function implements Subtask interface. It loads data from Tool Layer Tables using `Ctx.GetDal()`, convert Data using `converter.args.Convert` handler Then save data to Domain Layer Tables using BatchSaveDivider

()

Source from the content-addressed store, hash-verified

76// It loads data from Tool Layer Tables using `Ctx.GetDal()`, convert Data using `converter.args.Convert` handler
77// Then save data to Domain Layer Tables using BatchSaveDivider
78func (converter *DataConverter) Execute() errors.Error {
79 // load data from database
80 db := converter.args.Ctx.GetDal()
81
82 // batch save divider
83 RAW_DATA_ORIGIN := "RawDataOrigin"
84 divider := NewBatchSaveDivider(converter.args.Ctx, converter.args.BatchSize, converter.table, converter.params)
85
86 // set progress
87 converter.args.Ctx.SetProgress(0, -1)
88
89 cursor := converter.args.Input
90 defer cursor.Close()
91 ctx := converter.args.Ctx.GetContext()
92 // iterate all rows
93 for cursor.Next() {
94 select {
95 case <-ctx.Done():
96 return errors.Convert(ctx.Err())
97 default:
98 }
99 inputRow := reflect.New(converter.args.InputRowType).Interface()
100 err := db.Fetch(cursor, inputRow)
101 if err != nil {
102 return errors.Default.Wrap(err, "error fetching rows")
103 }
104
105 results, err := converter.args.Convert(inputRow)
106 if err != nil {
107 return errors.Default.Wrap(err, "error calling Converter plugin implementation")
108 }
109
110 for _, result := range results {
111 // get the batch operator for the specific type
112 batch, err := divider.ForType(reflect.TypeOf(result))
113 if err != nil {
114 return errors.Default.Wrap(err, "error getting batch from result")
115 }
116 // set raw data origin field
117 origin := reflect.ValueOf(result).Elem().FieldByName(RAW_DATA_ORIGIN)
118 if origin.IsValid() {
119 origin.Set(reflect.ValueOf(inputRow).Elem().FieldByName(RAW_DATA_ORIGIN))
120 }
121 // records get saved into db when slots were max outed
122 err = batch.Add(result)
123 if err != nil {
124 return errors.Default.Wrap(err, "error adding result to batch")
125 }
126 }
127 converter.args.Ctx.IncProgress(1)
128 }
129
130 // save the last batches
131 return divider.Close()
132}
133
134// Check if DataConverter implements SubTask interface
135var _ plugin.SubTask = (*DataConverter)(nil)

Callers 15

ConvertTaskChangelogFunction · 0.95
ConvertStoryChangelogFunction · 0.95
ConvertIterationFunction · 0.95
ConvertStoryLabelsFunction · 0.95
EnrichTaskCustomFieldsFunction · 0.95
ConvertAccountsFunction · 0.95
EnrichBugCustomFieldsFunction · 0.95
ConvertWorklogFunction · 0.95
EnrichStoryCustomFieldsFunction · 0.95
ConvertTaskFunction · 0.95
ConvertTaskCommitFunction · 0.95
ConvertTaskLabelsFunction · 0.95

Calls 15

ForTypeMethod · 0.95
CloseMethod · 0.95
NewBatchSaveDividerFunction · 0.85
ErrMethod · 0.80
WrapMethod · 0.80
GetDalMethod · 0.65
SetProgressMethod · 0.65
CloseMethod · 0.65
GetContextMethod · 0.65
NextMethod · 0.65
NewMethod · 0.65
FetchMethod · 0.65

Tested by

no test coverage detected