MCPcopy
hub / github.com/pingcap/tidb / WriteInsertInCsv

Function WriteInsertInCsv

dumpling/export/writer_util.go:292–418  ·  view source on GitHub ↗

WriteInsertInCsv writes TableDataIR to a storage.ExternalFileWriter in csv type

(
	pCtx *tcontext.Context,
	cfg *Config,
	meta TableMeta,
	tblIR TableDataIR,
	w storage.ExternalFileWriter,
	metrics *metrics,
)

Source from the content-addressed store, hash-verified

290
291// WriteInsertInCsv writes TableDataIR to a storage.ExternalFileWriter in csv type
292func WriteInsertInCsv(
293 pCtx *tcontext.Context,
294 cfg *Config,
295 meta TableMeta,
296 tblIR TableDataIR,
297 w storage.ExternalFileWriter,
298 metrics *metrics,
299) (n uint64, err error) {
300 fileRowIter := tblIR.Rows()
301 if !fileRowIter.HasNext() {
302 return 0, fileRowIter.Error()
303 }
304
305 bf := pool.Get().(*bytes.Buffer)
306 if bfCap := bf.Cap(); bfCap < lengthLimit {
307 bf.Grow(lengthLimit - bfCap)
308 }
309
310 wp := newWriterPipe(w, cfg.FileSize, UnspecifiedSize, metrics, cfg.Labels)
311 opt := &csvOption{
312 nullValue: cfg.CsvNullValue,
313 separator: []byte(cfg.CsvSeparator),
314 delimiter: []byte(cfg.CsvDelimiter),
315 lineTerminator: []byte(cfg.CsvLineTerminator),
316 binaryFormat: DialectBinaryFormatMap[cfg.CsvOutputDialect],
317 }
318
319 // use context.Background here to make sure writerPipe can deplete all the chunks in pipeline
320 ctx, cancel := tcontext.Background().WithLogger(pCtx.L()).WithCancel()
321 var wg sync.WaitGroup
322 wg.Add(1)
323 go func() {
324 wp.Run(ctx)
325 wg.Done()
326 }()
327 defer func() {
328 cancel()
329 wg.Wait()
330 }()
331
332 var (
333 row = MakeRowReceiver(meta.ColumnTypes())
334 counter uint64
335 lastCounter uint64
336 escapeBackslash = cfg.EscapeBackslash
337 selectedFields = meta.SelectedField()
338 )
339
340 defer func() {
341 if err != nil {
342 pCtx.L().Warn("fail to dumping table(chunk), will revert some metrics and start a retry if possible",
343 zap.String("database", meta.DatabaseName()),
344 zap.String("table", meta.TableName()),
345 zap.Uint64("finished rows", lastCounter),
346 zap.Uint64("finished size", wp.finishedFileSize),
347 log.ShortError(err))
348 SubGauge(metrics.finishedRowsGauge, float64(lastCounter))
349 SubGauge(metrics.finishedSizeGauge, float64(wp.finishedFileSize))

Callers 4

WriteInsertMethod · 0.85
TestWriteInsertInCsvFunction · 0.85

Calls 15

ShortErrorFunction · 0.92
CollectSuccessUnitFunction · 0.92
newWriterPipeFunction · 0.85
MakeRowReceiverFunction · 0.85
SubGaugeFunction · 0.85
escapeCSVFunction · 0.85
AddGaugeFunction · 0.85
WithCancelMethod · 0.80
WithLoggerMethod · 0.80
WarnMethod · 0.80
ShouldSwitchFileMethod · 0.80
RowsMethod · 0.65

Tested by 3

TestWriteInsertInCsvFunction · 0.68