ExportOverNetwork sends export requests to all the known groups.
(ctx context.Context, input *pb.ExportRequest)
| 1007 | |
| 1008 | // ExportOverNetwork sends export requests to all the known groups. |
| 1009 | func ExportOverNetwork(ctx context.Context, input *pb.ExportRequest) (ExportedFiles, error) { |
| 1010 | // If we haven't even had a single membership update, don't run export. |
| 1011 | if err := x.HealthCheck(); err != nil { |
| 1012 | glog.Errorf("Rejecting export request due to health check error: %v\n", err) |
| 1013 | return nil, err |
| 1014 | } |
| 1015 | // Get ReadTs from zero and wait for stream to catch up. |
| 1016 | ts, err := Timestamps(ctx, &pb.Num{ReadOnly: true}) |
| 1017 | if err != nil { |
| 1018 | glog.Errorf("Unable to retrieve readonly ts for export: %v\n", err) |
| 1019 | return nil, err |
| 1020 | } |
| 1021 | readTs := ts.ReadOnly |
| 1022 | glog.Infof("Got readonly ts from Zero: %d\n", readTs) |
| 1023 | |
| 1024 | // Let's first collect all groups. |
| 1025 | gids := groups().KnownGroups() |
| 1026 | glog.Infof("Requesting export for groups: %v\n", gids) |
| 1027 | |
| 1028 | type filesAndError struct { |
| 1029 | ExportedFiles |
| 1030 | error |
| 1031 | } |
| 1032 | ch := make(chan filesAndError, len(gids)) |
| 1033 | for _, gid := range gids { |
| 1034 | go func(group uint32) { |
| 1035 | req := &pb.ExportRequest{ |
| 1036 | GroupId: group, |
| 1037 | ReadTs: readTs, |
| 1038 | UnixTs: time.Now().Unix(), |
| 1039 | Format: input.Format, |
| 1040 | Namespace: input.Namespace, |
| 1041 | |
| 1042 | Destination: input.Destination, |
| 1043 | AccessKey: input.AccessKey, |
| 1044 | SecretKey: input.SecretKey, |
| 1045 | SessionToken: input.SessionToken, |
| 1046 | Anonymous: input.Anonymous, |
| 1047 | } |
| 1048 | files, err := handleExportOverNetwork(ctx, req) |
| 1049 | ch <- filesAndError{files, err} |
| 1050 | }(gid) |
| 1051 | } |
| 1052 | |
| 1053 | var allFiles ExportedFiles |
| 1054 | for range gids { |
| 1055 | pair := <-ch |
| 1056 | if pair.error != nil { |
| 1057 | rerr := errors.Wrapf(pair.error, "Export failed at readTs %d", readTs) |
| 1058 | glog.Errorln(rerr) |
| 1059 | return nil, rerr |
| 1060 | } |
| 1061 | allFiles = append(allFiles, pair.ExportedFiles...) |
| 1062 | } |
| 1063 | |
| 1064 | glog.Infof("Export at readTs %d DONE", readTs) |
| 1065 | return allFiles, nil |
| 1066 | } |
no test coverage detected