处理CSV文件
(taskCtx plugin.SubTaskContext, db dal.Dal, reader io.ReadCloser, fileMeta *models.QDevS3FileMeta)
| 108 | |
| 109 | // 处理CSV文件 |
| 110 | func processCSVData(taskCtx plugin.SubTaskContext, db dal.Dal, reader io.ReadCloser, fileMeta *models.QDevS3FileMeta) errors.Error { |
| 111 | defer reader.Close() |
| 112 | |
| 113 | // Get task data to access Identity Client |
| 114 | data := taskCtx.GetData().(*QDevTaskData) |
| 115 | |
| 116 | csvReader := csv.NewReader(reader) |
| 117 | // 使用默认的逗号分隔符,不需要设置 Comma |
| 118 | csvReader.LazyQuotes = true // 允许非标准引号处理 |
| 119 | csvReader.FieldsPerRecord = -1 // 允许每行字段数不同 |
| 120 | |
| 121 | // 读取标头 |
| 122 | headers, err := csvReader.Read() |
| 123 | taskCtx.GetLogger().Debug("CSV headers: %+v", headers) |
| 124 | if err != nil { |
| 125 | return errors.Convert(err) |
| 126 | } |
| 127 | |
| 128 | // Auto-detect CSV format from headers |
| 129 | isNewFormat := detectUserReportFormat(headers) |
| 130 | if isNewFormat { |
| 131 | taskCtx.GetLogger().Debug("Detected new user_report CSV format") |
| 132 | } else { |
| 133 | taskCtx.GetLogger().Debug("Detected old by_user_analytic CSV format") |
| 134 | } |
| 135 | |
| 136 | // 逐行读取数据 |
| 137 | for { |
| 138 | record, err := csvReader.Read() |
| 139 | if err == io.EOF { |
| 140 | break |
| 141 | } |
| 142 | if err != nil { |
| 143 | return errors.Convert(err) |
| 144 | } |
| 145 | |
| 146 | if isNewFormat { |
| 147 | reportData, err := createUserReportData(taskCtx.GetLogger(), headers, record, fileMeta, data.IdentityClient) |
| 148 | if err != nil { |
| 149 | return errors.Default.Wrap(err, "failed to create user report data") |
| 150 | } |
| 151 | err = db.CreateOrUpdate(reportData) |
| 152 | if err != nil { |
| 153 | return errors.Default.Wrap(err, "failed to save user report data") |
| 154 | } |
| 155 | } else { |
| 156 | // 创建用户数据对象 (updated to include display name resolution) |
| 157 | userData, err := createUserDataWithDisplayName(taskCtx.GetLogger(), headers, record, fileMeta, data.IdentityClient) |
| 158 | if err != nil { |
| 159 | return errors.Default.Wrap(err, "failed to create user data") |
| 160 | } |
| 161 | |
| 162 | err = db.CreateOrUpdate(userData) |
| 163 | if err != nil { |
| 164 | return errors.Default.Wrap(err, "failed to save user data") |
| 165 | } |
| 166 | } |
| 167 | } |
no test coverage detected