RegisterPeerTask registers peer and triggers seed peer download task.
(ctx context.Context, req *schedulerv1.PeerTaskRequest)
| 81 | |
| 82 | // RegisterPeerTask registers peer and triggers seed peer download task. |
| 83 | func (v *V1) RegisterPeerTask(ctx context.Context, req *schedulerv1.PeerTaskRequest) (*schedulerv1.RegisterResult, error) { |
| 84 | log := logger.WithPeer(req.PeerHost.GetId(), req.GetTaskId(), req.GetPeerId()) |
| 85 | log.Infof("register peer task request: %#v", req) |
| 86 | |
| 87 | // Store resource. |
| 88 | task := v.storeTask(ctx, req, commonv2.TaskType_STANDARD) |
| 89 | host := v.storeHost(ctx, req.GetPeerHost()) |
| 90 | peer := v.storePeer(ctx, req.GetPeerId(), req.UrlMeta.GetPriority(), req.UrlMeta.GetRange(), task, host) |
| 91 | |
| 92 | // Prefetch the entire task. |
| 93 | if req.GetPrefetch() { |
| 94 | go func() { |
| 95 | if _, err := v.prefetchTask(ctx, req); err != nil { |
| 96 | peer.Log.Errorf("prefetch task failed: %s", err.Error()) |
| 97 | } |
| 98 | }() |
| 99 | } |
| 100 | |
| 101 | // Trigger the first download of the task. |
| 102 | if err := v.triggerTask(ctx, req, task, host, peer, v.dynconfig); err != nil { |
| 103 | peer.Log.Error(err) |
| 104 | v.handleRegisterFailure(ctx, peer) |
| 105 | return nil, dferrors.New(commonv1.Code_SchedForbidden, err.Error()) |
| 106 | } |
| 107 | |
| 108 | // If the task does not succeed, it is scheduled as a normal task. |
| 109 | if !task.FSM.Is(resource.TaskStateSucceeded) { |
| 110 | peer.Log.Infof("register as normal task, because of task state is %s", |
| 111 | task.FSM.Current()) |
| 112 | |
| 113 | result, err := v.registerNormalTask(ctx, peer) |
| 114 | if err != nil { |
| 115 | peer.Log.Error(err) |
| 116 | v.handleRegisterFailure(ctx, peer) |
| 117 | return nil, dferrors.New(commonv1.Code_SchedError, err.Error()) |
| 118 | } |
| 119 | |
| 120 | return result, nil |
| 121 | } |
| 122 | |
| 123 | // If SizeScope is SizeScope_UNKNOW, then register as SizeScope_NORMAL. |
| 124 | sizeScope := types.SizeScopeV2ToV1(task.SizeScope()) |
| 125 | peer.Log.Infof("task size scope is %s", sizeScope) |
| 126 | |
| 127 | // The task state is TaskStateSucceeded and SizeScope is not invalid. |
| 128 | switch sizeScope { |
| 129 | case commonv1.SizeScope_EMPTY: |
| 130 | result, err := v.registerEmptyTask(ctx, peer) |
| 131 | if err != nil { |
| 132 | peer.Log.Error(err) |
| 133 | v.handleRegisterFailure(ctx, peer) |
| 134 | return nil, dferrors.New(commonv1.Code_SchedError, err.Error()) |
| 135 | } |
| 136 | |
| 137 | return result, nil |
| 138 | case commonv1.SizeScope_TINY: |
| 139 | // Validate data of direct piece. |
| 140 | if !peer.Task.CanReuseDirectPiece() { |