MCPcopy
hub / github.com/dragonflyoss/dragonfly / StatFile

Method StatFile

scheduler/service/service_v2.go:4841–4948  ·  view source on GitHub ↗

StatFile provides detailed status for files distribution in peers. This is a blocking call that first queries the file/dir entries and then queries all peers to collect the file's download state across the network. The response includes the file status on each peer.

(ctx context.Context, req *schedulerv2.StatFileRequest)

Source from the content-addressed store, hash-verified

4839// all peers to collect the file's download state across the network.
4840// The response includes the file status on each peer.
4841func (v *V2) StatFile(ctx context.Context, req *schedulerv2.StatFileRequest) (*schedulerv2.StatFileResponse, error) {
4842 log := logger.WithStatFile(req.Url)
4843
4844 if req.ConcurrentPeerCount == nil {
4845 concurrentPeerCount := int64(managertypes.DefaultPreheatConcurrentPeerCount)
4846 req.ConcurrentPeerCount = &concurrentPeerCount
4847 }
4848
4849 if len(req.FilteredQueryParams) == 0 {
4850 req.FilteredQueryParams = http.DefaultFilteredQueryParams
4851 }
4852
4853 if req.Timeout == nil {
4854 req.Timeout = durationpb.New(managertypes.DefaultJobTimeout)
4855 }
4856
4857 ctx, cancel := context.WithTimeout(ctx, req.GetTimeout().AsDuration())
4858 defer cancel()
4859
4860 var urls []string
4861 if strings.HasSuffix(req.GetUrl(), "/") {
4862 listResp, err := v.job.ListTaskEntries(ctx, &internaljob.ListTaskEntriesRequest{
4863 TaskID: idgen.TaskIDV2ByURLBased(req.GetUrl(), req.PieceLength, req.GetTag(), req.GetApplication(), req.FilteredQueryParams, ""),
4864 Url: req.GetUrl(),
4865 Timeout: req.GetTimeout(),
4866 Header: req.GetHeader(),
4867 CertificateChain: req.GetCertificateChain(),
4868 ObjectStorage: req.GetObjectStorage(),
4869 Hdfs: req.GetHdfs(),
4870 }, log)
4871 if err != nil {
4872 return nil, status.Errorf(codes.InvalidArgument, "failed to list task entries: %s", err)
4873 }
4874
4875 if len(listResp.Entries) == 0 {
4876 return nil, status.Errorf(codes.InvalidArgument, "stat url is a directory, but with no entry: %s", req.GetUrl())
4877 }
4878
4879 for _, entry := range listResp.Entries {
4880 if entry.IsDir {
4881 continue
4882 }
4883
4884 urls = append(urls, entry.Url)
4885 }
4886 } else {
4887 urls = append(urls, req.GetUrl())
4888 }
4889
4890 resp := &schedulerv2.StatFileResponse{
4891 Peers: make([]*schedulerv2.PeerFile, 0),
4892 }
4893
4894 var mu sync.Mutex
4895 peers := map[string]*schedulerv2.PeerFile{}
4896 eg, ctx := errgroup.WithContext(ctx)
4897 for _, url := range urls {
4898 eg.Go(func() error {

Callers 1

TestServiceV2_StatFileFunction · 0.95

Calls 8

TaskIDV2ByURLBasedFunction · 0.92
HostIDV2Function · 0.92
ListTaskEntriesMethod · 0.65
GetApplicationMethod · 0.65
ErrorfMethod · 0.65
InfofMethod · 0.65
GetTaskMethod · 0.65
ErrorMethod · 0.45

Tested by 1

TestServiceV2_StatFileFunction · 0.76