handleRegisterPeerRequest handles RegisterPeerRequest of AnnouncePeerRequest.
(ctx context.Context, stream schedulerv2.Scheduler_AnnouncePeerServer, hostID, taskID, peerID string, req *schedulerv2.RegisterPeerRequest)
| 1298 | |
| 1299 | // handleRegisterPeerRequest handles RegisterPeerRequest of AnnouncePeerRequest. |
| 1300 | func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.Scheduler_AnnouncePeerServer, hostID, taskID, peerID string, req *schedulerv2.RegisterPeerRequest) error { |
| 1301 | // Handle resource included host, task, and peer. |
| 1302 | host, task, peer, err := v.handleResource(ctx, stream, hostID, taskID, peerID, req.GetDownload()) |
| 1303 | if err != nil { |
| 1304 | return err |
| 1305 | } |
| 1306 | host.ConcurrentRegisterCount.Inc() |
| 1307 | defer host.ConcurrentRegisterCount.Dec() |
| 1308 | |
| 1309 | // Collect RegisterPeerCount metrics. |
| 1310 | priority := peer.CalculatePriority(v.dynconfig) |
| 1311 | metrics.RegisterPeerCount.WithLabelValues(priority.String(), peer.Task.Type.String(), |
| 1312 | peer.Host.Type.Name()).Inc() |
| 1313 | |
| 1314 | // Provides an exponential delay with jitter to prevent thundering herd problems. When a host has many |
| 1315 | // concurrent registration requests, later requests are delayed progressively to avoid overwhelming the |
| 1316 | // source with simultaneous back-to-source tasks from a single host. |
| 1317 | // |
| 1318 | // Note that the delay is skipped for seed peers for the following reasons: |
| 1319 | // 1. Seed peers serve latency-sensitive workloads such as nydus, which issue large numbers of small |
| 1320 | // IO requests. Applying a backoff delay would significantly increase the latency of these requests. |
| 1321 | // 2. The number of seed peers in a cluster is controlled and limited, and the scheduler can control |
| 1322 | // the concurrency of back-to-source tasks for seed peers. Therefore, there is no risk of a |
| 1323 | // thundering herd against the source, and the delay is unnecessary. |
| 1324 | if peer.Host.Type != types.HostTypeSuperSeed { |
| 1325 | if err := pkgtime.ExponentialDelayWithJitter(ctx, uint(host.ConcurrentRegisterCount.Load()), baseDelayForRegisterPeer, maxDelayForRegisterPeer); err != nil { |
| 1326 | // Collect RegisterPeerFailureCount metrics. |
| 1327 | metrics.RegisterPeerFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(), |
| 1328 | peer.Host.Type.Name()).Inc() |
| 1329 | return status.Error(codes.Internal, err.Error()) |
| 1330 | } |
| 1331 | } |
| 1332 | |
| 1333 | blocklist := set.NewSafeSet[string]() |
| 1334 | blocklist.Add(peer.ID) |
| 1335 | download := proto.Clone(req.Download).(*commonv2.Download) |
| 1336 | switch { |
| 1337 | // If scheduler trigger seed peer download back-to-source, |
| 1338 | // the needBackToSource flag should be true. |
| 1339 | case download.GetNeedBackToSource(): |
| 1340 | peer.Log.Info("peer need back to source") |
| 1341 | peer.NeedBackToSource.Store(true) |
| 1342 | // If task is pending, failed, leave, or succeeded and has no available peer, |
| 1343 | // scheduler trigger seed peer download back-to-source. |
| 1344 | case task.FSM.Is(standard.TaskStatePending) || |
| 1345 | task.FSM.Is(standard.TaskStateFailed) || |
| 1346 | task.FSM.Is(standard.TaskStateLeave) || |
| 1347 | task.FSM.Is(standard.TaskStateSucceeded) && |
| 1348 | !task.HasAvailablePeer(hostID, blocklist): |
| 1349 | |
| 1350 | // If HostType is normal, trigger seed peer download back-to-source. |
| 1351 | if host.Type == types.HostTypeNormal && v.resource.SeedPeer().HasAvailable() { |
| 1352 | // If trigger the seed peer download back-to-source, |
| 1353 | // the need back-to-source flag should be true. |
| 1354 | download.NeedBackToSource = true |
| 1355 | |
| 1356 | // Output path should be empty, prevent the seed peer |
| 1357 | // copy file to output path. |