MCPcopy
hub / github.com/syncthing/syncthing / pullerRoutine

Method pullerRoutine

lib/model/folder_sendrecv.go:1525–1562  ·  view source on GitHub ↗
(ctx context.Context, in <-chan pullBlockState, out chan<- *sharedPullerState)

Source from the content-addressed store, hash-verified

1523}
1524
1525func (f *sendReceiveFolder) pullerRoutine(ctx context.Context, in <-chan pullBlockState, out chan<- *sharedPullerState) {
1526 requestLimiter := semaphore.New(f.PullerMaxPendingKiB * 1024)
1527 var wg sync.WaitGroup
1528
1529 for state := range in {
1530 if state.failed() != nil {
1531 out <- state.sharedPullerState
1532 continue
1533 }
1534
1535 f.setState(FolderSyncing) // Does nothing if already FolderSyncing
1536
1537 // The requestLimiter limits how many pending block requests we have
1538 // ongoing at any given time, based on the size of the blocks
1539 // themselves.
1540
1541 bytes := state.block.Size
1542
1543 if bytes == 0 {
1544 // Pulling zero bytes is a no-op.
1545 state.pullDone(state.block)
1546 out <- state.sharedPullerState
1547 continue
1548 }
1549
1550 if err := requestLimiter.TakeWithContext(ctx, bytes); err != nil {
1551 state.fail(err)
1552 out <- state.sharedPullerState
1553 continue
1554 }
1555
1556 wg.Go(func() {
1557 defer requestLimiter.Give(bytes)
1558 f.pullBlock(ctx, state, out)
1559 })
1560 }
1561 wg.Wait()
1562}
1563
1564func (f *sendReceiveFolder) pullBlock(ctx context.Context, state pullBlockState, out chan<- *sharedPullerState) {
1565 // Get an fd to the temporary file. Technically we don't need it until

Callers 3

pullerIterationMethod · 0.95
TestPullCtxCancelFunction · 0.80

Calls 9

pullBlockMethod · 0.95
NewFunction · 0.92
failedMethod · 0.80
setStateMethod · 0.80
pullDoneMethod · 0.80
failMethod · 0.80
WaitMethod · 0.65
TakeWithContextMethod · 0.45
GiveMethod · 0.45

Tested by 2

TestPullCtxCancelFunction · 0.64