MCPcopy
hub / github.com/dragonflyoss/dragonfly / UploadPersistentCacheTaskStarted

Method UploadPersistentCacheTaskStarted

scheduler/service/service_v2.go:4043–4096  ·  view source on GitHub ↗

UploadPersistentCacheTaskStarted uploads the metadata of the persistent cache task started.

(ctx context.Context, req *schedulerv2.UploadPersistentCacheTaskStartedRequest)

Source from the content-addressed store, hash-verified

4041
4042// UploadPersistentCacheTaskStarted uploads the metadata of the persistent cache task started.
4043func (v *V2) UploadPersistentCacheTaskStarted(ctx context.Context, req *schedulerv2.UploadPersistentCacheTaskStartedRequest) error {
4044 if v.persistentCacheResource == nil {
4045 return status.Error(codes.FailedPrecondition, "redis is not enabled")
4046 }
4047
4048 log := logger.WithPeer(req.GetHostId(), req.GetTaskId(), req.GetPeerId())
4049 log.Info("upload persistent cache task started")
4050
4051 host, loaded := v.persistentCacheResource.HostManager().Load(ctx, req.GetHostId())
4052 if !loaded {
4053 log.Error("host not found")
4054 return status.Errorf(codes.NotFound, "host %s not found", req.GetHostId())
4055 }
4056
4057 // Handle task with task started request, new task and store it.
4058 task, loaded := v.persistentCacheResource.TaskManager().Load(ctx, req.GetTaskId())
4059 if loaded && !task.FSM.Can(persistentcache.TaskEventUpload) {
4060 log.Errorf("persistent cache task %s is %s cannot upload", task.ID, task.FSM.Current())
4061 return status.Errorf(codes.AlreadyExists, "persistent cache task %s is %s cannot upload", task.ID, task.FSM.Current())
4062 }
4063
4064 task = persistentcache.NewTask(req.GetTaskId(), req.GetTag(), req.GetApplication(), persistentcache.TaskStatePending, req.GetPersistentReplicaCount(),
4065 req.GetPieceLength(), req.GetContentLength(), req.GetPieceCount(), req.GetTtl().AsDuration(), time.Now(), time.Now(), log)
4066
4067 if err := task.FSM.Event(ctx, persistentcache.TaskEventUpload); err != nil {
4068 log.Errorf("task fsm event failed: %s", err.Error())
4069 return status.Error(codes.Internal, err.Error())
4070 }
4071
4072 if err := v.persistentCacheResource.TaskManager().Store(ctx, task); err != nil {
4073 log.Errorf("store persistent cache task %s error %s", task.ID, err)
4074 return status.Error(codes.Internal, err.Error())
4075 }
4076
4077 // Handle peer with task started request, new peer and store it.
4078 if peer, loaded := v.persistentCacheResource.PeerManager().Load(ctx, req.GetPeerId()); loaded {
4079 log.Error("persistent cache peer already exists")
4080 return status.Errorf(codes.AlreadyExists, "persistent cache peer %s already exists", peer.ID)
4081 }
4082
4083 peer := persistentcache.NewPeer(req.GetPeerId(), persistentcache.PeerStatePending, true, bitset.New(uint(req.GetPieceCount())), nil, task, host, 0, time.Now(), time.Now(), log)
4084
4085 if err := peer.FSM.Event(ctx, persistentcache.PeerEventUpload); err != nil {
4086 log.Errorf("peer fsm event failed: %s", err.Error())
4087 return status.Error(codes.Internal, err.Error())
4088 }
4089
4090 if err := v.persistentCacheResource.PeerManager().Store(ctx, peer); err != nil {
4091 log.Errorf("store persistent cache peer %s error %s", peer.ID, err)
4092 return status.Error(codes.Internal, err.Error())
4093 }
4094
4095 return nil
4096}
4097
4098// UploadPersistentCacheTaskFinished uploads the metadata of the persistent cache task finished.
4099func (v *V2) UploadPersistentCacheTaskFinished(ctx context.Context, req *schedulerv2.UploadPersistentCacheTaskFinishedRequest) (*commonv2.PersistentCacheTask, error) {

Callers

nothing calls this directly

Calls 11

NewTaskFunction · 0.92
NewPeerFunction · 0.92
InfoMethod · 0.80
LoadMethod · 0.65
HostManagerMethod · 0.65
ErrorfMethod · 0.65
TaskManagerMethod · 0.65
GetApplicationMethod · 0.65
StoreMethod · 0.65
PeerManagerMethod · 0.65
ErrorMethod · 0.45

Tested by

no test coverage detected