MCPcopy
hub / github.com/dragonflyoss/dragonfly / RegisterPeerTask

Method RegisterPeerTask

scheduler/service/service_v1.go:83–172  ·  view source on GitHub ↗

RegisterPeerTask registers peer and triggers seed peer download task.

(ctx context.Context, req *schedulerv1.PeerTaskRequest)

Source from the content-addressed store, hash-verified

81
82// RegisterPeerTask registers peer and triggers seed peer download task.
83func (v *V1) RegisterPeerTask(ctx context.Context, req *schedulerv1.PeerTaskRequest) (*schedulerv1.RegisterResult, error) {
84 log := logger.WithPeer(req.PeerHost.GetId(), req.GetTaskId(), req.GetPeerId())
85 log.Infof("register peer task request: %#v", req)
86
87 // Store resource.
88 task := v.storeTask(ctx, req, commonv2.TaskType_STANDARD)
89 host := v.storeHost(ctx, req.GetPeerHost())
90 peer := v.storePeer(ctx, req.GetPeerId(), req.UrlMeta.GetPriority(), req.UrlMeta.GetRange(), task, host)
91
92 // Prefetch the entire task.
93 if req.GetPrefetch() {
94 go func() {
95 if _, err := v.prefetchTask(ctx, req); err != nil {
96 peer.Log.Errorf("prefetch task failed: %s", err.Error())
97 }
98 }()
99 }
100
101 // Trigger the first download of the task.
102 if err := v.triggerTask(ctx, req, task, host, peer, v.dynconfig); err != nil {
103 peer.Log.Error(err)
104 v.handleRegisterFailure(ctx, peer)
105 return nil, dferrors.New(commonv1.Code_SchedForbidden, err.Error())
106 }
107
108 // If the task does not succeed, it is scheduled as a normal task.
109 if !task.FSM.Is(resource.TaskStateSucceeded) {
110 peer.Log.Infof("register as normal task, because of task state is %s",
111 task.FSM.Current())
112
113 result, err := v.registerNormalTask(ctx, peer)
114 if err != nil {
115 peer.Log.Error(err)
116 v.handleRegisterFailure(ctx, peer)
117 return nil, dferrors.New(commonv1.Code_SchedError, err.Error())
118 }
119
120 return result, nil
121 }
122
123 // If SizeScope is SizeScope_UNKNOW, then register as SizeScope_NORMAL.
124 sizeScope := types.SizeScopeV2ToV1(task.SizeScope())
125 peer.Log.Infof("task size scope is %s", sizeScope)
126
127 // The task state is TaskStateSucceeded and SizeScope is not invalid.
128 switch sizeScope {
129 case commonv1.SizeScope_EMPTY:
130 result, err := v.registerEmptyTask(ctx, peer)
131 if err != nil {
132 peer.Log.Error(err)
133 v.handleRegisterFailure(ctx, peer)
134 return nil, dferrors.New(commonv1.Code_SchedError, err.Error())
135 }
136
137 return result, nil
138 case commonv1.SizeScope_TINY:
139 // Validate data of direct piece.
140 if !peer.Task.CanReuseDirectPiece() {

Callers 1

Calls 15

storeTaskMethod · 0.95
storeHostMethod · 0.95
storePeerMethod · 0.95
prefetchTaskMethod · 0.95
triggerTaskMethod · 0.95
handleRegisterFailureMethod · 0.95
registerNormalTaskMethod · 0.95
registerEmptyTaskMethod · 0.95
registerTinyTaskMethod · 0.95
registerSmallTaskMethod · 0.95
NewFunction · 0.92
SizeScopeV2ToV1Function · 0.92

Tested by 1