MCPcopy Index your code
hub / github.com/apache/devlake / ExtractQDevS3Data

Function ExtractQDevS3Data

backend/plugins/q_dev/tasks/s3_data_extractor.go:39–107  ·  view source on GitHub ↗

ExtractQDevS3Data 从S3下载CSV数据并解析

(taskCtx plugin.SubTaskContext)

Source from the content-addressed store, hash-verified

37
38// ExtractQDevS3Data 从S3下载CSV数据并解析
39func ExtractQDevS3Data(taskCtx plugin.SubTaskContext) errors.Error {
40 data := taskCtx.GetData().(*QDevTaskData)
41 db := taskCtx.GetDal()
42
43 // 查询未处理的CSV文件元数据(排除.json.gz日志文件)
44 cursor, err := db.Cursor(
45 dal.From(&models.QDevS3FileMeta{}),
46 dal.Where("connection_id = ? AND processed = ? AND file_name LIKE ?",
47 data.Options.ConnectionId, false, "%.csv"),
48 )
49 if err != nil {
50 return errors.Default.Wrap(err, "failed to get file metadata cursor")
51 }
52 defer cursor.Close()
53
54 taskCtx.SetProgress(0, -1)
55
56 // 处理每个文件
57 for cursor.Next() {
58 fileMeta := &models.QDevS3FileMeta{}
59 err = db.Fetch(cursor, fileMeta)
60 if err != nil {
61 return errors.Default.Wrap(err, "failed to fetch file metadata")
62 }
63
64 // 获取文件内容
65 getInput := &s3.GetObjectInput{
66 Bucket: aws.String(data.S3Client.Bucket),
67 Key: aws.String(fileMeta.S3Path),
68 }
69
70 getResult, err := data.S3Client.S3.GetObject(getInput)
71 if err != nil {
72 return errors.Convert(err)
73 }
74
75 // Use a transaction to process the file and update its status
76 tx := db.Begin()
77 csvErr := processCSVData(taskCtx, tx, getResult.Body, fileMeta)
78 if csvErr != nil {
79 if rollbackErr := tx.Rollback(); rollbackErr != nil {
80 taskCtx.GetLogger().Error(rollbackErr, "failed to rollback transaction")
81 }
82 return errors.Default.Wrap(csvErr, fmt.Sprintf("failed to process CSV file %s", fileMeta.FileName))
83 }
84
85 // Update file processing status within the same transaction
86 fileMeta.Processed = true
87 now := time.Now()
88 fileMeta.ProcessedTime = &now
89 err = tx.Update(fileMeta)
90 if err != nil {
91 if rollbackErr := tx.Rollback(); rollbackErr != nil {
92 taskCtx.GetLogger().Error(rollbackErr, "failed to rollback transaction")
93 }
94 return errors.Default.Wrap(err, "failed to update file metadata")
95 }
96

Callers

nothing calls this directly

Calls 15

processCSVDataFunction · 0.85
WrapMethod · 0.80
GetDataMethod · 0.65
GetDalMethod · 0.65
CursorMethod · 0.65
FromMethod · 0.65
CloseMethod · 0.65
SetProgressMethod · 0.65
NextMethod · 0.65
FetchMethod · 0.65
BeginMethod · 0.65
RollbackMethod · 0.65

Tested by

no test coverage detected