addToAdapter adds the given "pending" transfers to the transfer adapters and returns a channel of Transfers that are to be retried in the next batch. After all of the items in the batch have been processed, the channel is closed. addToAdapter returns immediately, and does not block.
(e lfshttp.Endpoint, pending []*Transfer)
| 709 | // |
| 710 | // addToAdapter returns immediately, and does not block. |
| 711 | func (q *TransferQueue) addToAdapter(e lfshttp.Endpoint, pending []*Transfer) <-chan *objectTuple { |
| 712 | q.Upgrade() |
| 713 | |
| 714 | retries := make(chan *objectTuple, len(pending)) |
| 715 | |
| 716 | if err := q.ensureAdapterBegun(e); err != nil { |
| 717 | close(retries) |
| 718 | |
| 719 | q.errorc <- err |
| 720 | for _, t := range pending { |
| 721 | q.Skip(t.Size) |
| 722 | q.wait.Done() |
| 723 | } |
| 724 | |
| 725 | return retries |
| 726 | } |
| 727 | |
| 728 | present, missingResults := q.partitionTransfers(pending) |
| 729 | |
| 730 | go func() { |
| 731 | defer close(retries) |
| 732 | |
| 733 | var results <-chan TransferResult |
| 734 | if q.dryRun { |
| 735 | results = q.makeDryRunResults(present) |
| 736 | } else { |
| 737 | results = q.adapter.Add(present...) |
| 738 | } |
| 739 | |
| 740 | for _, res := range missingResults { |
| 741 | q.handleTransferResult(res, retries) |
| 742 | } |
| 743 | for res := range results { |
| 744 | q.handleTransferResult(res, retries) |
| 745 | } |
| 746 | }() |
| 747 | |
| 748 | return retries |
| 749 | } |
| 750 | |
| 751 | func (q *TransferQueue) partitionTransfers(transfers []*Transfer) (present []*Transfer, results []TransferResult) { |
| 752 | q.Upgrade() |
no test coverage detected