MCPcopy
hub / github.com/dragonflyoss/dragonfly / handleRegisterPeerRequest

Method handleRegisterPeerRequest

scheduler/service/service_v2.go:1300–1450  ·  view source on GitHub ↗

handleRegisterPeerRequest handles RegisterPeerRequest of AnnouncePeerRequest.

(ctx context.Context, stream schedulerv2.Scheduler_AnnouncePeerServer, hostID, taskID, peerID string, req *schedulerv2.RegisterPeerRequest)

Source from the content-addressed store, hash-verified

1298
1299// handleRegisterPeerRequest handles RegisterPeerRequest of AnnouncePeerRequest.
1300func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.Scheduler_AnnouncePeerServer, hostID, taskID, peerID string, req *schedulerv2.RegisterPeerRequest) error {
1301 // Handle resource included host, task, and peer.
1302 host, task, peer, err := v.handleResource(ctx, stream, hostID, taskID, peerID, req.GetDownload())
1303 if err != nil {
1304 return err
1305 }
1306 host.ConcurrentRegisterCount.Inc()
1307 defer host.ConcurrentRegisterCount.Dec()
1308
1309 // Collect RegisterPeerCount metrics.
1310 priority := peer.CalculatePriority(v.dynconfig)
1311 metrics.RegisterPeerCount.WithLabelValues(priority.String(), peer.Task.Type.String(),
1312 peer.Host.Type.Name()).Inc()
1313
1314 // Provides an exponential delay with jitter to prevent thundering herd problems. When a host has many
1315 // concurrent registration requests, later requests are delayed progressively to avoid overwhelming the
1316 // source with simultaneous back-to-source tasks from a single host.
1317 //
1318 // Note that the delay is skipped for seed peers for the following reasons:
1319 // 1. Seed peers serve latency-sensitive workloads such as nydus, which issue large numbers of small
1320 // IO requests. Applying a backoff delay would significantly increase the latency of these requests.
1321 // 2. The number of seed peers in a cluster is controlled and limited, and the scheduler can control
1322 // the concurrency of back-to-source tasks for seed peers. Therefore, there is no risk of a
1323 // thundering herd against the source, and the delay is unnecessary.
1324 if peer.Host.Type != types.HostTypeSuperSeed {
1325 if err := pkgtime.ExponentialDelayWithJitter(ctx, uint(host.ConcurrentRegisterCount.Load()), baseDelayForRegisterPeer, maxDelayForRegisterPeer); err != nil {
1326 // Collect RegisterPeerFailureCount metrics.
1327 metrics.RegisterPeerFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(),
1328 peer.Host.Type.Name()).Inc()
1329 return status.Error(codes.Internal, err.Error())
1330 }
1331 }
1332
1333 blocklist := set.NewSafeSet[string]()
1334 blocklist.Add(peer.ID)
1335 download := proto.Clone(req.Download).(*commonv2.Download)
1336 switch {
1337 // If scheduler trigger seed peer download back-to-source,
1338 // the needBackToSource flag should be true.
1339 case download.GetNeedBackToSource():
1340 peer.Log.Info("peer need back to source")
1341 peer.NeedBackToSource.Store(true)
1342 // If task is pending, failed, leave, or succeeded and has no available peer,
1343 // scheduler trigger seed peer download back-to-source.
1344 case task.FSM.Is(standard.TaskStatePending) ||
1345 task.FSM.Is(standard.TaskStateFailed) ||
1346 task.FSM.Is(standard.TaskStateLeave) ||
1347 task.FSM.Is(standard.TaskStateSucceeded) &&
1348 !task.HasAvailablePeer(hostID, blocklist):
1349
1350 // If HostType is normal, trigger seed peer download back-to-source.
1351 if host.Type == types.HostTypeNormal && v.resource.SeedPeer().HasAvailable() {
1352 // If trigger the seed peer download back-to-source,
1353 // the need back-to-source flag should be true.
1354 download.NeedBackToSource = true
1355
1356 // Output path should be empty, prevent the seed peer
1357 // copy file to output path.

Callers 2

AnnouncePeerMethod · 0.95

Calls 15

handleResourceMethod · 0.95
AddMethod · 0.95
CalculatePriorityMethod · 0.80
NameMethod · 0.80
InfoMethod · 0.80
HasAvailablePeerMethod · 0.80
LoadMethod · 0.65
StoreMethod · 0.65
HasAvailableMethod · 0.65
SeedPeerMethod · 0.65

Tested by 1