enqueueAndCollectRetriesFor makes a Batch API call and returns a "next" batch containing all of the objects that failed from the previous batch and had retries available to them. If an error was encountered while making the API request, _all_ of the items from the previous batch (that have retries
(batch batch)
| 539 | // enqueueAndCollectRetriesFor blocks until the entire Batch "batch" has been |
| 540 | // processed. |
| 541 | func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error) { |
| 542 | q.Upgrade() |
| 543 | |
| 544 | next := q.makeBatch() |
| 545 | tracerx.Printf("tq: sending batch of size %d", len(batch)) |
| 546 | |
| 547 | enqueueRetry := func(t *objectTuple, err error, readyTime *time.Time) { |
| 548 | count := q.rc.Increment(t.Oid) |
| 549 | |
| 550 | if !t.retryLaterTime.IsZero() { |
| 551 | t.ReadyTime = t.retryLaterTime |
| 552 | t.retryLaterTime = time.Time{} |
| 553 | } else if readyTime == nil { |
| 554 | t.ReadyTime = q.rc.ReadyTime(t.Oid) |
| 555 | } else { |
| 556 | t.ReadyTime = *readyTime |
| 557 | } |
| 558 | delay := time.Until(t.ReadyTime).Seconds() |
| 559 | |
| 560 | var errMsg string |
| 561 | if err != nil { |
| 562 | errMsg = fmt.Sprintf(": %s", err) |
| 563 | } |
| 564 | tracerx.Printf("tq: enqueue retry #%d after %.2fs for %q (size: %d)%s", count, delay, t.Oid, t.Size, errMsg) |
| 565 | next = append(next, t) |
| 566 | } |
| 567 | |
| 568 | q.meter.Pause() |
| 569 | var bRes *BatchResponse |
| 570 | manifest := q.manifest.Upgrade() |
| 571 | if manifest.standaloneTransferAgent != "" { |
| 572 | // Trust the external transfer agent can do everything by itself. |
| 573 | objects := make([]*Transfer, 0, len(batch)) |
| 574 | for _, t := range batch { |
| 575 | objects = append(objects, &Transfer{Oid: t.Oid, Size: t.Size, Path: t.Path}) |
| 576 | } |
| 577 | bRes = &BatchResponse{ |
| 578 | Objects: objects, |
| 579 | TransferAdapterName: manifest.standaloneTransferAgent, |
| 580 | } |
| 581 | } else { |
| 582 | // Query the Git LFS server for what transfer method to use and |
| 583 | // details such as URLs, authentication, etc. |
| 584 | var err error |
| 585 | bRes, err = Batch(q.manifest, q.direction, q.remote, q.ref, batch.ToTransfers()) |
| 586 | if err != nil { |
| 587 | var hasNonRetriableObjects = false |
| 588 | // If there was an error making the batch API call, mark all of |
| 589 | // the objects for retry if possible. If any should not be retried, |
| 590 | // they will be marked as failed. |
| 591 | for _, t := range batch { |
| 592 | if q.canRetryObject(t.Oid, err) { |
| 593 | enqueueRetry(t, err, nil) |
| 594 | } else if readyTime, canRetry := q.canRetryObjectLater(t.Oid, err); canRetry { |
| 595 | enqueueRetry(t, err, &readyTime) |
| 596 | } else { |
| 597 | hasNonRetriableObjects = true |
| 598 | q.wait.Done() |
no test coverage detected