UploadPersistentCacheTaskStarted uploads the metadata of the persistent cache task started.
(ctx context.Context, req *schedulerv2.UploadPersistentCacheTaskStartedRequest)
| 4041 | |
| 4042 | // UploadPersistentCacheTaskStarted uploads the metadata of the persistent cache task started. |
| 4043 | func (v *V2) UploadPersistentCacheTaskStarted(ctx context.Context, req *schedulerv2.UploadPersistentCacheTaskStartedRequest) error { |
| 4044 | if v.persistentCacheResource == nil { |
| 4045 | return status.Error(codes.FailedPrecondition, "redis is not enabled") |
| 4046 | } |
| 4047 | |
| 4048 | log := logger.WithPeer(req.GetHostId(), req.GetTaskId(), req.GetPeerId()) |
| 4049 | log.Info("upload persistent cache task started") |
| 4050 | |
| 4051 | host, loaded := v.persistentCacheResource.HostManager().Load(ctx, req.GetHostId()) |
| 4052 | if !loaded { |
| 4053 | log.Error("host not found") |
| 4054 | return status.Errorf(codes.NotFound, "host %s not found", req.GetHostId()) |
| 4055 | } |
| 4056 | |
| 4057 | // Handle task with task started request, new task and store it. |
| 4058 | task, loaded := v.persistentCacheResource.TaskManager().Load(ctx, req.GetTaskId()) |
| 4059 | if loaded && !task.FSM.Can(persistentcache.TaskEventUpload) { |
| 4060 | log.Errorf("persistent cache task %s is %s cannot upload", task.ID, task.FSM.Current()) |
| 4061 | return status.Errorf(codes.AlreadyExists, "persistent cache task %s is %s cannot upload", task.ID, task.FSM.Current()) |
| 4062 | } |
| 4063 | |
| 4064 | task = persistentcache.NewTask(req.GetTaskId(), req.GetTag(), req.GetApplication(), persistentcache.TaskStatePending, req.GetPersistentReplicaCount(), |
| 4065 | req.GetPieceLength(), req.GetContentLength(), req.GetPieceCount(), req.GetTtl().AsDuration(), time.Now(), time.Now(), log) |
| 4066 | |
| 4067 | if err := task.FSM.Event(ctx, persistentcache.TaskEventUpload); err != nil { |
| 4068 | log.Errorf("task fsm event failed: %s", err.Error()) |
| 4069 | return status.Error(codes.Internal, err.Error()) |
| 4070 | } |
| 4071 | |
| 4072 | if err := v.persistentCacheResource.TaskManager().Store(ctx, task); err != nil { |
| 4073 | log.Errorf("store persistent cache task %s error %s", task.ID, err) |
| 4074 | return status.Error(codes.Internal, err.Error()) |
| 4075 | } |
| 4076 | |
| 4077 | // Handle peer with task started request, new peer and store it. |
| 4078 | if peer, loaded := v.persistentCacheResource.PeerManager().Load(ctx, req.GetPeerId()); loaded { |
| 4079 | log.Error("persistent cache peer already exists") |
| 4080 | return status.Errorf(codes.AlreadyExists, "persistent cache peer %s already exists", peer.ID) |
| 4081 | } |
| 4082 | |
| 4083 | peer := persistentcache.NewPeer(req.GetPeerId(), persistentcache.PeerStatePending, true, bitset.New(uint(req.GetPieceCount())), nil, task, host, 0, time.Now(), time.Now(), log) |
| 4084 | |
| 4085 | if err := peer.FSM.Event(ctx, persistentcache.PeerEventUpload); err != nil { |
| 4086 | log.Errorf("peer fsm event failed: %s", err.Error()) |
| 4087 | return status.Error(codes.Internal, err.Error()) |
| 4088 | } |
| 4089 | |
| 4090 | if err := v.persistentCacheResource.PeerManager().Store(ctx, peer); err != nil { |
| 4091 | log.Errorf("store persistent cache peer %s error %s", peer.ID, err) |
| 4092 | return status.Error(codes.Internal, err.Error()) |
| 4093 | } |
| 4094 | |
| 4095 | return nil |
| 4096 | } |
| 4097 | |
| 4098 | // UploadPersistentCacheTaskFinished uploads the metadata of the persistent cache task finished. |
| 4099 | func (v *V2) UploadPersistentCacheTaskFinished(ctx context.Context, req *schedulerv2.UploadPersistentCacheTaskFinishedRequest) (*commonv2.PersistentCacheTask, error) { |
nothing calls this directly
no test coverage detected