MCPcopy
hub / github.com/dgraph-io/dgraph / run

Method run

dgraph/cmd/bulk/mapper.go:220–292  ·  view source on GitHub ↗
(inputFormat chunker.InputFormat)

Source from the content-addressed store, hash-verified

218}
219
220func (m *mapper) run(inputFormat chunker.InputFormat) {
221 chunk := chunker.NewChunker(inputFormat, 1000)
222 nquads := chunk.NQuads()
223 go func() {
224 for chunkMeta := range m.readerChunkCh {
225 if err := chunk.Parse(chunkMeta.buf); err != nil {
226 atomic.AddInt64(&m.prog.errCount, 1)
227 if m.errorLog != nil {
228 m.errorLog.Log(chunkMeta.filename, err, "")
229 }
230 if !m.opt.IgnoreErrors {
231 x.Check(err)
232 }
233 }
234 }
235 aclOnce.Do(func() {
236 if m.opt.Namespace != math.MaxUint64 && m.opt.Namespace != x.RootNamespace {
237 // Insert ACL related RDFs force uploading the data into non-galaxy namespace.
238 aclNquads := make([]*api.NQuad, 0)
239 aclNquads = append(aclNquads, acl.CreateGroupNQuads(x.SuperAdminId)...)
240 aclNquads = append(aclNquads, acl.CreateUserNQuads(x.GrootId, "password")...)
241 aclNquads = append(aclNquads, &api.NQuad{
242 Subject: "_:newuser",
243 Predicate: "dgraph.user.group",
244 ObjectId: "_:newgroup",
245 })
246 nquads.Push(aclNquads...)
247 }
248 })
249 nquads.Flush()
250 }()
251
252 for nqs := range nquads.Ch() {
253 for _, nq := range nqs {
254 if err := facets.SortAndValidate(nq.Facets); err != nil {
255 atomic.AddInt64(&m.prog.errCount, 1)
256 if m.errorLog != nil {
257 m.errorLog.Log("<facet_validation>", err, fmt.Sprintf("subject=%s predicate=%s", nq.Subject, nq.Predicate))
258 }
259 if !m.opt.IgnoreErrors {
260 x.Check(err)
261 }
262 }
263
264 m.processNQuad(dql.NQuad{NQuad: nq})
265 atomic.AddInt64(&m.prog.nquadCount, 1)
266 }
267
268 for i := range m.shards {
269 sh := &m.shards[i]
270 if uint64(sh.cbuf.LenNoPadding()) >= m.opt.MapBufSize {
271 sh.mu.Lock() // One write at a time.
272 go m.writeMapEntriesToFile(sh.cbuf, i)
273 // Clear the entries and encodedSize for the next batch.
274 // Proactively allocate 32 slots to bootstrap the entries slice.
275 sh.cbuf = newMapperBuffer(m.opt)
276 }
277 }

Callers 1

mapStageMethod · 0.45

Calls 15

NQuadsMethod · 0.95
ParseMethod · 0.95
processNQuadMethod · 0.95
writeMapEntriesToFileMethod · 0.95
NewChunkerFunction · 0.92
CheckFunction · 0.92
CreateGroupNQuadsFunction · 0.92
CreateUserNQuadsFunction · 0.92
SortAndValidateFunction · 0.92
newMapperBufferFunction · 0.85
LogMethod · 0.80
ChMethod · 0.80

Tested by

no test coverage detected