MCPcopy
hub / github.com/pingcap/tidb / WriteInsert

Function WriteInsert

dumpling/export/writer_util.go:144–289  ·  view source on GitHub ↗

WriteInsert writes TableDataIR to a storage.ExternalFileWriter in sql type

(
	pCtx *tcontext.Context,
	cfg *Config,
	meta TableMeta,
	tblIR TableDataIR,
	w storage.ExternalFileWriter,
	metrics *metrics,
)

Source from the content-addressed store, hash-verified

142
143// WriteInsert writes TableDataIR to a storage.ExternalFileWriter in sql type
144func WriteInsert(
145 pCtx *tcontext.Context,
146 cfg *Config,
147 meta TableMeta,
148 tblIR TableDataIR,
149 w storage.ExternalFileWriter,
150 metrics *metrics,
151) (n uint64, err error) {
152 fileRowIter := tblIR.Rows()
153 if !fileRowIter.HasNext() {
154 return 0, fileRowIter.Error()
155 }
156
157 bf := pool.Get().(*bytes.Buffer)
158 if bfCap := bf.Cap(); bfCap < lengthLimit {
159 bf.Grow(lengthLimit - bfCap)
160 }
161
162 wp := newWriterPipe(w, cfg.FileSize, cfg.StatementSize, metrics, cfg.Labels)
163
164 // use context.Background here to make sure writerPipe can deplete all the chunks in pipeline
165 ctx, cancel := tcontext.Background().WithLogger(pCtx.L()).WithCancel()
166 var wg sync.WaitGroup
167 wg.Add(1)
168 go func() {
169 wp.Run(ctx)
170 wg.Done()
171 }()
172 defer func() {
173 cancel()
174 wg.Wait()
175 }()
176
177 specCmtIter := meta.SpecialComments()
178 for specCmtIter.HasNext() {
179 bf.WriteString(specCmtIter.Next())
180 bf.WriteByte('\n')
181 }
182 wp.currentFileSize += uint64(bf.Len())
183
184 var (
185 insertStatementPrefix string
186 row = MakeRowReceiver(meta.ColumnTypes())
187 counter uint64
188 lastCounter uint64
189 escapeBackslash = cfg.EscapeBackslash
190 )
191
192 defer func() {
193 if err != nil {
194 pCtx.L().Warn("fail to dumping table(chunk), will revert some metrics and start a retry if possible",
195 zap.String("database", meta.DatabaseName()),
196 zap.String("table", meta.TableName()),
197 zap.Uint64("finished rows", lastCounter),
198 zap.Uint64("finished size", wp.finishedFileSize),
199 log.ShortError(err))
200 SubGauge(metrics.finishedRowsGauge, float64(lastCounter))
201 SubGauge(metrics.finishedSizeGauge, float64(wp.finishedFileSize))

Callers 4

WriteInsertMethod · 0.85
TestWriteInsertFunction · 0.85
TestSQLDataTypesFunction · 0.85

Calls 15

ShortErrorFunction · 0.92
CollectSuccessUnitFunction · 0.92
newWriterPipeFunction · 0.85
MakeRowReceiverFunction · 0.85
SubGaugeFunction · 0.85
wrapBackTicksFunction · 0.85
AddGaugeFunction · 0.85
WithCancelMethod · 0.80
WithLoggerMethod · 0.80
WarnMethod · 0.80
AddFileSizeMethod · 0.80
ShouldSwitchStatementMethod · 0.80

Tested by 3

TestWriteInsertFunction · 0.68
TestSQLDataTypesFunction · 0.68