put batch size data to database
(c plugin.SubTaskContext, starrocksTmpTable, table string, data []map[string]interface{}, config *StarRocksConfig, offset int)
| 379 | |
| 380 | // put batch size data to database |
| 381 | func putBatchData(c plugin.SubTaskContext, starrocksTmpTable, table string, data []map[string]interface{}, config *StarRocksConfig, offset int) error { |
| 382 | logger := c.GetLogger() |
| 383 | // insert data to tmp table |
| 384 | loadURL := fmt.Sprintf("http://%s:%d/api/%s/%s/_stream_load", config.BeHost, config.BePort, config.Database, starrocksTmpTable) |
| 385 | headers := map[string]string{ |
| 386 | "format": "json", |
| 387 | "strip_outer_array": "true", |
| 388 | "Expect": "100-continue", |
| 389 | "ignore_json_size": "true", |
| 390 | "Connection": "close", |
| 391 | } |
| 392 | jsonData, err := json.Marshal(data) |
| 393 | if err != nil { |
| 394 | return err |
| 395 | } |
| 396 | client := http.Client{ |
| 397 | CheckRedirect: func(req *http.Request, via []*http.Request) error { |
| 398 | return http.ErrUseLastResponse |
| 399 | }, |
| 400 | } |
| 401 | req, err := http.NewRequest(http.MethodPut, loadURL, bytes.NewBuffer(jsonData)) |
| 402 | if err != nil { |
| 403 | return err |
| 404 | } |
| 405 | req.SetBasicAuth(config.User, config.Password) |
| 406 | for k, v := range headers { |
| 407 | req.Header.Set(k, v) |
| 408 | } |
| 409 | resp, err := client.Do(req) |
| 410 | if err != nil { |
| 411 | return err |
| 412 | } |
| 413 | defer resp.Body.Close() |
| 414 | var b []byte |
| 415 | |
| 416 | if resp.StatusCode == 307 { |
| 417 | var location *url.URL |
| 418 | location, err = resp.Location() |
| 419 | if err != nil { |
| 420 | return err |
| 421 | } |
| 422 | req, err = http.NewRequest(http.MethodPut, location.String(), bytes.NewBuffer(jsonData)) |
| 423 | if err != nil { |
| 424 | return err |
| 425 | } |
| 426 | req.SetBasicAuth(config.User, config.Password) |
| 427 | for k, v := range headers { |
| 428 | req.Header.Set(k, v) |
| 429 | } |
| 430 | respRetry, err := client.Do(req) |
| 431 | if err != nil { |
| 432 | return err |
| 433 | } |
| 434 | defer respRetry.Body.Close() |
| 435 | b, err = io.ReadAll(respRetry.Body) |
| 436 | if err != nil { |
| 437 | return err |
| 438 | } |