| 1523 | } |
| 1524 | |
| 1525 | func (f *sendReceiveFolder) pullerRoutine(ctx context.Context, in <-chan pullBlockState, out chan<- *sharedPullerState) { |
| 1526 | requestLimiter := semaphore.New(f.PullerMaxPendingKiB * 1024) |
| 1527 | var wg sync.WaitGroup |
| 1528 | |
| 1529 | for state := range in { |
| 1530 | if state.failed() != nil { |
| 1531 | out <- state.sharedPullerState |
| 1532 | continue |
| 1533 | } |
| 1534 | |
| 1535 | f.setState(FolderSyncing) // Does nothing if already FolderSyncing |
| 1536 | |
| 1537 | // The requestLimiter limits how many pending block requests we have |
| 1538 | // ongoing at any given time, based on the size of the blocks |
| 1539 | // themselves. |
| 1540 | |
| 1541 | bytes := state.block.Size |
| 1542 | |
| 1543 | if bytes == 0 { |
| 1544 | // Pulling zero bytes is a no-op. |
| 1545 | state.pullDone(state.block) |
| 1546 | out <- state.sharedPullerState |
| 1547 | continue |
| 1548 | } |
| 1549 | |
| 1550 | if err := requestLimiter.TakeWithContext(ctx, bytes); err != nil { |
| 1551 | state.fail(err) |
| 1552 | out <- state.sharedPullerState |
| 1553 | continue |
| 1554 | } |
| 1555 | |
| 1556 | wg.Go(func() { |
| 1557 | defer requestLimiter.Give(bytes) |
| 1558 | f.pullBlock(ctx, state, out) |
| 1559 | }) |
| 1560 | } |
| 1561 | wg.Wait() |
| 1562 | } |
| 1563 | |
| 1564 | func (f *sendReceiveFolder) pullBlock(ctx context.Context, state pullBlockState, out chan<- *sharedPullerState) { |
| 1565 | // Get an fd to the temporary file. Technically we don't need it until |