WriteInsertInCsv writes TableDataIR to a storage.ExternalFileWriter in csv type
( pCtx *tcontext.Context, cfg *Config, meta TableMeta, tblIR TableDataIR, w storage.ExternalFileWriter, metrics *metrics, )
| 290 | |
| 291 | // WriteInsertInCsv writes TableDataIR to a storage.ExternalFileWriter in csv type |
| 292 | func WriteInsertInCsv( |
| 293 | pCtx *tcontext.Context, |
| 294 | cfg *Config, |
| 295 | meta TableMeta, |
| 296 | tblIR TableDataIR, |
| 297 | w storage.ExternalFileWriter, |
| 298 | metrics *metrics, |
| 299 | ) (n uint64, err error) { |
| 300 | fileRowIter := tblIR.Rows() |
| 301 | if !fileRowIter.HasNext() { |
| 302 | return 0, fileRowIter.Error() |
| 303 | } |
| 304 | |
| 305 | bf := pool.Get().(*bytes.Buffer) |
| 306 | if bfCap := bf.Cap(); bfCap < lengthLimit { |
| 307 | bf.Grow(lengthLimit - bfCap) |
| 308 | } |
| 309 | |
| 310 | wp := newWriterPipe(w, cfg.FileSize, UnspecifiedSize, metrics, cfg.Labels) |
| 311 | opt := &csvOption{ |
| 312 | nullValue: cfg.CsvNullValue, |
| 313 | separator: []byte(cfg.CsvSeparator), |
| 314 | delimiter: []byte(cfg.CsvDelimiter), |
| 315 | lineTerminator: []byte(cfg.CsvLineTerminator), |
| 316 | binaryFormat: DialectBinaryFormatMap[cfg.CsvOutputDialect], |
| 317 | } |
| 318 | |
| 319 | // use context.Background here to make sure writerPipe can deplete all the chunks in pipeline |
| 320 | ctx, cancel := tcontext.Background().WithLogger(pCtx.L()).WithCancel() |
| 321 | var wg sync.WaitGroup |
| 322 | wg.Add(1) |
| 323 | go func() { |
| 324 | wp.Run(ctx) |
| 325 | wg.Done() |
| 326 | }() |
| 327 | defer func() { |
| 328 | cancel() |
| 329 | wg.Wait() |
| 330 | }() |
| 331 | |
| 332 | var ( |
| 333 | row = MakeRowReceiver(meta.ColumnTypes()) |
| 334 | counter uint64 |
| 335 | lastCounter uint64 |
| 336 | escapeBackslash = cfg.EscapeBackslash |
| 337 | selectedFields = meta.SelectedField() |
| 338 | ) |
| 339 | |
| 340 | defer func() { |
| 341 | if err != nil { |
| 342 | pCtx.L().Warn("fail to dumping table(chunk), will revert some metrics and start a retry if possible", |
| 343 | zap.String("database", meta.DatabaseName()), |
| 344 | zap.String("table", meta.TableName()), |
| 345 | zap.Uint64("finished rows", lastCounter), |
| 346 | zap.Uint64("finished size", wp.finishedFileSize), |
| 347 | log.ShortError(err)) |
| 348 | SubGauge(metrics.finishedRowsGauge, float64(lastCounter)) |
| 349 | SubGauge(metrics.finishedSizeGauge, float64(wp.finishedFileSize)) |