StatFile provides detailed status for files distribution in peers. This is a blocking call that first queries the file/dir entries and then queries all peers to collect the file's download state across the network. The response includes the file status on each peer.
(ctx context.Context, req *schedulerv2.StatFileRequest)
| 4839 | // all peers to collect the file's download state across the network. |
| 4840 | // The response includes the file status on each peer. |
| 4841 | func (v *V2) StatFile(ctx context.Context, req *schedulerv2.StatFileRequest) (*schedulerv2.StatFileResponse, error) { |
| 4842 | log := logger.WithStatFile(req.Url) |
| 4843 | |
| 4844 | if req.ConcurrentPeerCount == nil { |
| 4845 | concurrentPeerCount := int64(managertypes.DefaultPreheatConcurrentPeerCount) |
| 4846 | req.ConcurrentPeerCount = &concurrentPeerCount |
| 4847 | } |
| 4848 | |
| 4849 | if len(req.FilteredQueryParams) == 0 { |
| 4850 | req.FilteredQueryParams = http.DefaultFilteredQueryParams |
| 4851 | } |
| 4852 | |
| 4853 | if req.Timeout == nil { |
| 4854 | req.Timeout = durationpb.New(managertypes.DefaultJobTimeout) |
| 4855 | } |
| 4856 | |
| 4857 | ctx, cancel := context.WithTimeout(ctx, req.GetTimeout().AsDuration()) |
| 4858 | defer cancel() |
| 4859 | |
| 4860 | var urls []string |
| 4861 | if strings.HasSuffix(req.GetUrl(), "/") { |
| 4862 | listResp, err := v.job.ListTaskEntries(ctx, &internaljob.ListTaskEntriesRequest{ |
| 4863 | TaskID: idgen.TaskIDV2ByURLBased(req.GetUrl(), req.PieceLength, req.GetTag(), req.GetApplication(), req.FilteredQueryParams, ""), |
| 4864 | Url: req.GetUrl(), |
| 4865 | Timeout: req.GetTimeout(), |
| 4866 | Header: req.GetHeader(), |
| 4867 | CertificateChain: req.GetCertificateChain(), |
| 4868 | ObjectStorage: req.GetObjectStorage(), |
| 4869 | Hdfs: req.GetHdfs(), |
| 4870 | }, log) |
| 4871 | if err != nil { |
| 4872 | return nil, status.Errorf(codes.InvalidArgument, "failed to list task entries: %s", err) |
| 4873 | } |
| 4874 | |
| 4875 | if len(listResp.Entries) == 0 { |
| 4876 | return nil, status.Errorf(codes.InvalidArgument, "stat url is a directory, but with no entry: %s", req.GetUrl()) |
| 4877 | } |
| 4878 | |
| 4879 | for _, entry := range listResp.Entries { |
| 4880 | if entry.IsDir { |
| 4881 | continue |
| 4882 | } |
| 4883 | |
| 4884 | urls = append(urls, entry.Url) |
| 4885 | } |
| 4886 | } else { |
| 4887 | urls = append(urls, req.GetUrl()) |
| 4888 | } |
| 4889 | |
| 4890 | resp := &schedulerv2.StatFileResponse{ |
| 4891 | Peers: make([]*schedulerv2.PeerFile, 0), |
| 4892 | } |
| 4893 | |
| 4894 | var mu sync.Mutex |
| 4895 | peers := map[string]*schedulerv2.PeerFile{} |
| 4896 | eg, ctx := errgroup.WithContext(ctx) |
| 4897 | for _, url := range urls { |
| 4898 | eg.Go(func() error { |