(inputFormat chunker.InputFormat)
| 218 | } |
| 219 | |
| 220 | func (m *mapper) run(inputFormat chunker.InputFormat) { |
| 221 | chunk := chunker.NewChunker(inputFormat, 1000) |
| 222 | nquads := chunk.NQuads() |
| 223 | go func() { |
| 224 | for chunkMeta := range m.readerChunkCh { |
| 225 | if err := chunk.Parse(chunkMeta.buf); err != nil { |
| 226 | atomic.AddInt64(&m.prog.errCount, 1) |
| 227 | if m.errorLog != nil { |
| 228 | m.errorLog.Log(chunkMeta.filename, err, "") |
| 229 | } |
| 230 | if !m.opt.IgnoreErrors { |
| 231 | x.Check(err) |
| 232 | } |
| 233 | } |
| 234 | } |
| 235 | aclOnce.Do(func() { |
| 236 | if m.opt.Namespace != math.MaxUint64 && m.opt.Namespace != x.RootNamespace { |
| 237 | // Insert ACL related RDFs force uploading the data into non-galaxy namespace. |
| 238 | aclNquads := make([]*api.NQuad, 0) |
| 239 | aclNquads = append(aclNquads, acl.CreateGroupNQuads(x.SuperAdminId)...) |
| 240 | aclNquads = append(aclNquads, acl.CreateUserNQuads(x.GrootId, "password")...) |
| 241 | aclNquads = append(aclNquads, &api.NQuad{ |
| 242 | Subject: "_:newuser", |
| 243 | Predicate: "dgraph.user.group", |
| 244 | ObjectId: "_:newgroup", |
| 245 | }) |
| 246 | nquads.Push(aclNquads...) |
| 247 | } |
| 248 | }) |
| 249 | nquads.Flush() |
| 250 | }() |
| 251 | |
| 252 | for nqs := range nquads.Ch() { |
| 253 | for _, nq := range nqs { |
| 254 | if err := facets.SortAndValidate(nq.Facets); err != nil { |
| 255 | atomic.AddInt64(&m.prog.errCount, 1) |
| 256 | if m.errorLog != nil { |
| 257 | m.errorLog.Log("<facet_validation>", err, fmt.Sprintf("subject=%s predicate=%s", nq.Subject, nq.Predicate)) |
| 258 | } |
| 259 | if !m.opt.IgnoreErrors { |
| 260 | x.Check(err) |
| 261 | } |
| 262 | } |
| 263 | |
| 264 | m.processNQuad(dql.NQuad{NQuad: nq}) |
| 265 | atomic.AddInt64(&m.prog.nquadCount, 1) |
| 266 | } |
| 267 | |
| 268 | for i := range m.shards { |
| 269 | sh := &m.shards[i] |
| 270 | if uint64(sh.cbuf.LenNoPadding()) >= m.opt.MapBufSize { |
| 271 | sh.mu.Lock() // One write at a time. |
| 272 | go m.writeMapEntriesToFile(sh.cbuf, i) |
| 273 | // Clear the entries and encodedSize for the next batch. |
| 274 | // Proactively allocate 32 slots to bootstrap the entries slice. |
| 275 | sh.cbuf = newMapperBuffer(m.opt) |
| 276 | } |
| 277 | } |
no test coverage detected