(ctx context.Context, items []*ImportItem)
| 367 | } |
| 368 | |
| 369 | func (meta *baseMeta) ParallelImport(ctx context.Context, items []*ImportItem) error { |
| 370 | meta.tc.Trace(telemetry.Info, "ParallelImport Enter") |
| 371 | defer meta.tc.Trace(telemetry.Info, "ParallelImport Leave") |
| 372 | |
| 373 | total := len(items) |
| 374 | itemsCh := make(chan *ImportItem, total) |
| 375 | for _, item := range items { |
| 376 | itemsCh <- item |
| 377 | } |
| 378 | close(itemsCh) |
| 379 | |
| 380 | wp := workerpool.NewWorkPool(meta.parallelism) |
| 381 | |
| 382 | wp.Run(func(i interface{}) error { |
| 383 | idx := i.(int) |
| 384 | |
| 385 | // Noop if tfclient is set |
| 386 | if meta.tfclient != nil { |
| 387 | return nil |
| 388 | } |
| 389 | |
| 390 | stateFile := filepath.Join(meta.importBaseDirs[idx], "terraform.tfstate") |
| 391 | |
| 392 | // Don't merge state file if this import dir doesn't contain state file, which can because either this import dir imported nothing, or it encountered import error |
| 393 | if _, err := os.Stat(stateFile); os.IsNotExist(err) { |
| 394 | return nil |
| 395 | } |
| 396 | // Ensure the state file is removed after this round import, preparing for the next round. |
| 397 | defer os.Remove(stateFile) |
| 398 | |
| 399 | meta.Logger().Debug("Merging terraform state file (tfmerge)", "file", stateFile) |
| 400 | newState, err := tfmerge.Merge(ctx, meta.tf, meta.baseState, stateFile) |
| 401 | if err != nil { |
| 402 | return fmt.Errorf("failed to merge state file: %v", err) |
| 403 | } |
| 404 | meta.baseState = newState |
| 405 | |
| 406 | return nil |
| 407 | }) |
| 408 | |
| 409 | for i := 0; i < meta.parallelism; i++ { |
| 410 | i := i |
| 411 | wp.AddTask(func() (interface{}, error) { |
| 412 | for item := range itemsCh { |
| 413 | iitem := config.ImportItem{ |
| 414 | AzureResourceID: item.AzureResourceID, |
| 415 | TFResourceId: item.TFResourceId, |
| 416 | ImportError: item.ImportError, |
| 417 | TFAddr: item.TFAddr, |
| 418 | } |
| 419 | startTime := time.Now() |
| 420 | if meta.preImportHook != nil { |
| 421 | meta.preImportHook(startTime, iitem) |
| 422 | } |
| 423 | meta.importItem(ctx, item, i) |
| 424 | if meta.postImportHook != nil { |
| 425 | meta.postImportHook(startTime, iitem) |
| 426 | } |
nothing calls this directly
no test coverage detected