MCPcopy
hub / github.com/dgraph-io/dgraph / writeMapEntriesToFile

Method writeMapEntriesToFile

dgraph/cmd/bulk/mapper.go:146–218  ·  view source on GitHub ↗
(cbuf *z.Buffer, shardIdx int)

Source from the content-addressed store, hash-verified

144}
145
146func (m *mapper) writeMapEntriesToFile(cbuf *z.Buffer, shardIdx int) {
147 defer func() {
148 m.shards[shardIdx].mu.Unlock() // Locked by caller.
149 if err := cbuf.Release(); err != nil {
150 glog.Warningf("error in releasing buffer: %v", err)
151 }
152 }()
153
154 cbuf.SortSlice(func(ls, rs []byte) bool {
155 lhs := MapEntry(ls)
156 rhs := MapEntry(rs)
157 return less(lhs, rhs)
158 })
159
160 f, err := m.openOutputFile(shardIdx)
161 x.Check(err)
162
163 defer func() {
164 x.Check(f.Sync())
165 x.Check(f.Close())
166 }()
167
168 w := s2.NewWriter(f)
169 defer func() {
170 x.Check(w.Close())
171 }()
172
173 // Create partition keys for the map file.
174 header := &pb.MapHeader{
175 PartitionKeys: [][]byte{},
176 }
177
178 var bufSize int64
179 if err := cbuf.SliceIterate(func(slice []byte) error {
180 me := MapEntry(slice)
181 bufSize += int64(4 + len(me))
182 if bufSize < m.opt.PartitionBufSize {
183 return nil
184 }
185 sz := len(header.PartitionKeys)
186 if sz > 0 && bytes.Equal(me.Key(), header.PartitionKeys[sz-1]) {
187 // We already have this key.
188 return nil
189 }
190 header.PartitionKeys = append(header.PartitionKeys, me.Key())
191 bufSize = 0
192 return nil
193 }); err != nil {
194 glog.Errorf("error while iterating over buf: %v", err)
195 x.Check(err)
196 }
197
198 // Write the header to the map file.s
199 headerBuf, err := proto.Marshal(header)
200 x.Check(err)
201 lenBuf := make([]byte, 4)
202 binary.BigEndian.PutUint32(lenBuf, uint32(len(headerBuf)))
203 x.Check2(w.Write(lenBuf))

Callers 1

runMethod · 0.95

Calls 13

openOutputFileMethod · 0.95
WriteMethod · 0.95
CheckFunction · 0.92
Check2Function · 0.92
MapEntryTypeAlias · 0.85
ReleaseMethod · 0.80
WarningfMethod · 0.80
lessFunction · 0.70
SyncMethod · 0.65
CloseMethod · 0.65
UnlockMethod · 0.45
KeyMethod · 0.45

Tested by

no test coverage detected