MCPcopy
hub / github.com/apache/devlake / putBatchData

Function putBatchData

backend/plugins/starrocks/tasks/tasks.go:381–460  ·  view source on GitHub ↗

put batch size data to database

(c plugin.SubTaskContext, starrocksTmpTable, table string, data []map[string]interface{}, config *StarRocksConfig, offset int)

Source from the content-addressed store, hash-verified

379
380// put batch size data to database
381func putBatchData(c plugin.SubTaskContext, starrocksTmpTable, table string, data []map[string]interface{}, config *StarRocksConfig, offset int) error {
382 logger := c.GetLogger()
383 // insert data to tmp table
384 loadURL := fmt.Sprintf("http://%s:%d/api/%s/%s/_stream_load", config.BeHost, config.BePort, config.Database, starrocksTmpTable)
385 headers := map[string]string{
386 "format": "json",
387 "strip_outer_array": "true",
388 "Expect": "100-continue",
389 "ignore_json_size": "true",
390 "Connection": "close",
391 }
392 jsonData, err := json.Marshal(data)
393 if err != nil {
394 return err
395 }
396 client := http.Client{
397 CheckRedirect: func(req *http.Request, via []*http.Request) error {
398 return http.ErrUseLastResponse
399 },
400 }
401 req, err := http.NewRequest(http.MethodPut, loadURL, bytes.NewBuffer(jsonData))
402 if err != nil {
403 return err
404 }
405 req.SetBasicAuth(config.User, config.Password)
406 for k, v := range headers {
407 req.Header.Set(k, v)
408 }
409 resp, err := client.Do(req)
410 if err != nil {
411 return err
412 }
413 defer resp.Body.Close()
414 var b []byte
415
416 if resp.StatusCode == 307 {
417 var location *url.URL
418 location, err = resp.Location()
419 if err != nil {
420 return err
421 }
422 req, err = http.NewRequest(http.MethodPut, location.String(), bytes.NewBuffer(jsonData))
423 if err != nil {
424 return err
425 }
426 req.SetBasicAuth(config.User, config.Password)
427 for k, v := range headers {
428 req.Header.Set(k, v)
429 }
430 respRetry, err := client.Do(req)
431 if err != nil {
432 return err
433 }
434 defer respRetry.Body.Close()
435 b, err = io.ReadAll(respRetry.Body)
436 if err != nil {
437 return err
438 }

Callers 1

copyDataToDstFunction · 0.85

Calls 6

DoMethod · 0.80
GetLoggerMethod · 0.65
CloseMethod · 0.65
ErrorMethod · 0.65
DebugMethod · 0.65
StringMethod · 0.45

Tested by

no test coverage detected