(opt *BulkOptions, precomputedWriteTs uint64)
| 146 | } |
| 147 | |
| 148 | func newLoader(opt *BulkOptions, precomputedWriteTs uint64) *loader { |
| 149 | if opt == nil { |
| 150 | log.Fatalf("Cannot create loader with nil options.") |
| 151 | } |
| 152 | |
| 153 | var zero *grpc.ClientConn |
| 154 | if opt.ZeroAddr != "" { |
| 155 | fmt.Printf("Connecting to zero at %s\n", opt.ZeroAddr) |
| 156 | |
| 157 | tlsConf, err := x.LoadClientTLSConfigForInternalPort(Bulk.Conf) |
| 158 | x.Check(err) |
| 159 | dialOpts := []grpc.DialOption{} |
| 160 | if tlsConf != nil { |
| 161 | dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConf))) |
| 162 | } else { |
| 163 | dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials())) |
| 164 | } |
| 165 | zero, err = grpc.NewClient(opt.ZeroAddr, dialOpts...) |
| 166 | x.Checkf(err, "Unable to connect to zero, Is it running at %s?", opt.ZeroAddr) |
| 167 | } |
| 168 | |
| 169 | var dg *dgo.Dgraph |
| 170 | if opt.ConnStr != "" { |
| 171 | fmt.Printf("Connecting to alpha at %s\n", opt.ConnStr) |
| 172 | |
| 173 | var err error |
| 174 | dg, err = dgo.Open(opt.ConnStr) |
| 175 | x.Checkf(err, "Unable to connect to alpha, Is it running at %s?", opt.ConnStr) |
| 176 | } |
| 177 | |
| 178 | var errLog *errorLogger |
| 179 | if opt.LogErrors { |
| 180 | if !opt.IgnoreErrors { |
| 181 | fmt.Fprintln(os.Stderr, "Warning: --log_errors requires --ignore_errors to be set") |
| 182 | } |
| 183 | var err error |
| 184 | errLog, err = newErrorLogger(opt.ErrorLogPath) |
| 185 | x.Checkf(err, "Unable to create error log file at %s", opt.ErrorLogPath) |
| 186 | fmt.Printf("Error logging enabled, writing to: %s\n", opt.ErrorLogPath) |
| 187 | } |
| 188 | |
| 189 | writeTs := precomputedWriteTs |
| 190 | if writeTs == 0 { |
| 191 | writeTs = getWriteTimestamp(zero, dg) |
| 192 | } |
| 193 | st := &state{ |
| 194 | opt: opt, |
| 195 | prog: newProgress(), |
| 196 | shards: newShardMap(opt.MapShards), |
| 197 | // Lots of gz readers, so not much channel buffer needed. |
| 198 | readerChunkCh: make(chan *chunkWithMeta, opt.NumGoroutines), |
| 199 | writeTs: writeTs, |
| 200 | namespaces: &sync.Map{}, |
| 201 | errorLog: errLog, |
| 202 | } |
| 203 | var parsedSchema *schema.ParsedSchema |
| 204 | if !opt.SkipMapPhase { |
| 205 | parsedSchema = readSchema(opt) |
no test coverage detected