MCPcopy
hub / github.com/kopia/kopia / run

Method run

cli/command_snapshot_migrate.go:48–141  ·  view source on GitHub ↗
(ctx context.Context, destRepo repo.RepositoryWriter)

Source from the content-addressed store, hash-verified

46}
47
48func (c *commandSnapshotMigrate) run(ctx context.Context, destRepo repo.RepositoryWriter) error {
49 sourceRepo, err := c.openSourceRepo(ctx)
50 if err != nil {
51 return errors.Wrap(err, "can't open source repository")
52 }
53
54 defer sourceRepo.Close(ctx) //nolint:errcheck
55
56 sources, err := c.getSourcesToMigrate(ctx, sourceRepo)
57 if err != nil {
58 return errors.Wrap(err, "can't retrieve sources")
59 }
60
61 semaphore := make(chan struct{}, c.migrateParallel)
62
63 var (
64 wg sync.WaitGroup
65 mu sync.Mutex
66 canceled bool
67 activeUploaders = map[snapshot.SourceInfo]*upload.Uploader{}
68 )
69
70 c.svc.getProgress().StartShared()
71
72 c.svc.onTerminate(func() {
73 mu.Lock()
74 defer mu.Unlock()
75
76 if canceled {
77 return
78 }
79
80 canceled = true
81
82 for s, u := range activeUploaders {
83 log(ctx).Infof("canceling active uploader for %v", s)
84 u.Cancel()
85 }
86 })
87
88 if c.migratePolicies {
89 if c.migrateAll {
90 err = c.migrateAllPolicies(ctx, sourceRepo, destRepo)
91 } else {
92 err = c.migratePoliciesForSources(ctx, sourceRepo, destRepo, sources)
93 }
94
95 if err != nil {
96 return errors.Wrap(err, "unable to migrate policies")
97 }
98 }
99
100 for _, s := range sources {
101 // start a new uploader unless already canceled
102 mu.Lock()
103
104 if canceled {
105 mu.Unlock()

Callers

nothing calls this directly

Calls 15

openSourceRepoMethod · 0.95
getSourcesToMigrateMethod · 0.95
migrateAllPoliciesMethod · 0.95
migrateSingleSourceMethod · 0.95
NewUploaderFunction · 0.92
StartSharedMethod · 0.80
ErrorfMethod · 0.80
FinishSharedMethod · 0.80
printStderrMethod · 0.80
InfoMethod · 0.80
CloseMethod · 0.65

Tested by

no test coverage detected