MCPcopy
hub / github.com/dgraph-io/dgraph / newLoader

Function newLoader

dgraph/cmd/bulk/loader.go:148–221  ·  view source on GitHub ↗
(opt *BulkOptions, precomputedWriteTs uint64)

Source from the content-addressed store, hash-verified

146}
147
148func newLoader(opt *BulkOptions, precomputedWriteTs uint64) *loader {
149 if opt == nil {
150 log.Fatalf("Cannot create loader with nil options.")
151 }
152
153 var zero *grpc.ClientConn
154 if opt.ZeroAddr != "" {
155 fmt.Printf("Connecting to zero at %s\n", opt.ZeroAddr)
156
157 tlsConf, err := x.LoadClientTLSConfigForInternalPort(Bulk.Conf)
158 x.Check(err)
159 dialOpts := []grpc.DialOption{}
160 if tlsConf != nil {
161 dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConf)))
162 } else {
163 dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))
164 }
165 zero, err = grpc.NewClient(opt.ZeroAddr, dialOpts...)
166 x.Checkf(err, "Unable to connect to zero, Is it running at %s?", opt.ZeroAddr)
167 }
168
169 var dg *dgo.Dgraph
170 if opt.ConnStr != "" {
171 fmt.Printf("Connecting to alpha at %s\n", opt.ConnStr)
172
173 var err error
174 dg, err = dgo.Open(opt.ConnStr)
175 x.Checkf(err, "Unable to connect to alpha, Is it running at %s?", opt.ConnStr)
176 }
177
178 var errLog *errorLogger
179 if opt.LogErrors {
180 if !opt.IgnoreErrors {
181 fmt.Fprintln(os.Stderr, "Warning: --log_errors requires --ignore_errors to be set")
182 }
183 var err error
184 errLog, err = newErrorLogger(opt.ErrorLogPath)
185 x.Checkf(err, "Unable to create error log file at %s", opt.ErrorLogPath)
186 fmt.Printf("Error logging enabled, writing to: %s\n", opt.ErrorLogPath)
187 }
188
189 writeTs := precomputedWriteTs
190 if writeTs == 0 {
191 writeTs = getWriteTimestamp(zero, dg)
192 }
193 st := &state{
194 opt: opt,
195 prog: newProgress(),
196 shards: newShardMap(opt.MapShards),
197 // Lots of gz readers, so not much channel buffer needed.
198 readerChunkCh: make(chan *chunkWithMeta, opt.NumGoroutines),
199 writeTs: writeTs,
200 namespaces: &sync.Map{},
201 errorLog: errLog,
202 }
203 var parsedSchema *schema.ParsedSchema
204 if !opt.SkipMapPhase {
205 parsedSchema = readSchema(opt)

Callers 1

RunBulkLoaderFunction · 0.85

Calls 13

CheckFunction · 0.92
CheckfFunction · 0.92
newErrorLoggerFunction · 0.85
getWriteTimestampFunction · 0.85
newProgressFunction · 0.85
newShardMapFunction · 0.85
newSchemaStoreFunction · 0.85
newMapperFunction · 0.85
FatalfMethod · 0.80
reportMethod · 0.80
readSchemaFunction · 0.70

Tested by

no test coverage detected