MCPcopy
hub / github.com/wal-g/wal-g / tryExtractFiles

Function tryExtractFiles

internal/extract.go:201–253  ·  view source on GitHub ↗

TODO : unit tests

(files []ReaderMaker,
	tarInterpreter TarInterpreter,
	downloadingConcurrency int)

Source from the content-addressed store, hash-verified

199
200// TODO : unit tests
201func tryExtractFiles(files []ReaderMaker,
202 tarInterpreter TarInterpreter,
203 downloadingConcurrency int) (failed []ReaderMaker) {
204 downloadingContext := context.TODO()
205 downloadingSemaphore := semaphore.NewWeighted(int64(downloadingConcurrency))
206 crypter := ConfigureCrypter()
207 isFailed := sync.Map{}
208
209 for _, file := range files {
210 err := downloadingSemaphore.Acquire(downloadingContext, 1)
211 if err != nil {
212 tracelog.ErrorLogger.Println(err)
213 return files //Should never happen, but if we are asked to cancel - consider all files unfinished
214 }
215 fileClosure := file
216
217 go func() {
218 defer downloadingSemaphore.Release(1)
219
220 readCloser, err := fileClosure.Reader()
221 if err == nil {
222 defer utility.LoggedClose(readCloser, "")
223
224 filePath := fileClosure.StoragePath()
225 var extractingReader io.ReadCloser
226 extractingReader, err = DecryptAndDecompressTar(readCloser, filePath, crypter)
227 if err == nil {
228 defer extractingReader.Close()
229 err = extractFile(tarInterpreter, extractingReader, fileClosure)
230 err = errors.Wrapf(err, "Extraction error in %s", filePath)
231 tracelog.InfoLogger.Printf("Finished extraction of %s", filePath)
232 }
233 }
234
235 if err != nil {
236 isFailed.Store(fileClosure, true)
237 tracelog.ErrorLogger.Println(err)
238 }
239 }()
240 }
241
242 err := downloadingSemaphore.Acquire(downloadingContext, int64(downloadingConcurrency))
243 if err != nil {
244 tracelog.ErrorLogger.Println(err)
245 return files //Should never happen, but if we are asked to cancel - consider all files unfinished
246 }
247
248 isFailed.Range(func(failedFile, _ interface{}) bool {
249 failed = append(failed, failedFile.(ReaderMaker))
250 return true
251 })
252 return failed
253}
254
255func readTrailingZeros(r io.Reader) error {
256 // on first iteration we read small chunk

Callers 1

ExtractAllWithSleeperFunction · 0.85

Calls 10

LoggedCloseFunction · 0.92
ConfigureCrypterFunction · 0.85
DecryptAndDecompressTarFunction · 0.85
extractFileFunction · 0.85
PrintlnMethod · 0.80
ReaderMethod · 0.65
StoragePathMethod · 0.65
CloseMethod · 0.65
StoreMethod · 0.45
RangeMethod · 0.45

Tested by

no test coverage detected