(cbuf *z.Buffer, shardIdx int)
| 144 | } |
| 145 | |
| 146 | func (m *mapper) writeMapEntriesToFile(cbuf *z.Buffer, shardIdx int) { |
| 147 | defer func() { |
| 148 | m.shards[shardIdx].mu.Unlock() // Locked by caller. |
| 149 | if err := cbuf.Release(); err != nil { |
| 150 | glog.Warningf("error in releasing buffer: %v", err) |
| 151 | } |
| 152 | }() |
| 153 | |
| 154 | cbuf.SortSlice(func(ls, rs []byte) bool { |
| 155 | lhs := MapEntry(ls) |
| 156 | rhs := MapEntry(rs) |
| 157 | return less(lhs, rhs) |
| 158 | }) |
| 159 | |
| 160 | f, err := m.openOutputFile(shardIdx) |
| 161 | x.Check(err) |
| 162 | |
| 163 | defer func() { |
| 164 | x.Check(f.Sync()) |
| 165 | x.Check(f.Close()) |
| 166 | }() |
| 167 | |
| 168 | w := s2.NewWriter(f) |
| 169 | defer func() { |
| 170 | x.Check(w.Close()) |
| 171 | }() |
| 172 | |
| 173 | // Create partition keys for the map file. |
| 174 | header := &pb.MapHeader{ |
| 175 | PartitionKeys: [][]byte{}, |
| 176 | } |
| 177 | |
| 178 | var bufSize int64 |
| 179 | if err := cbuf.SliceIterate(func(slice []byte) error { |
| 180 | me := MapEntry(slice) |
| 181 | bufSize += int64(4 + len(me)) |
| 182 | if bufSize < m.opt.PartitionBufSize { |
| 183 | return nil |
| 184 | } |
| 185 | sz := len(header.PartitionKeys) |
| 186 | if sz > 0 && bytes.Equal(me.Key(), header.PartitionKeys[sz-1]) { |
| 187 | // We already have this key. |
| 188 | return nil |
| 189 | } |
| 190 | header.PartitionKeys = append(header.PartitionKeys, me.Key()) |
| 191 | bufSize = 0 |
| 192 | return nil |
| 193 | }); err != nil { |
| 194 | glog.Errorf("error while iterating over buf: %v", err) |
| 195 | x.Check(err) |
| 196 | } |
| 197 | |
| 198 | // Write the header to the map file.s |
| 199 | headerBuf, err := proto.Marshal(header) |
| 200 | x.Check(err) |
| 201 | lenBuf := make([]byte, 4) |
| 202 | binary.BigEndian.PutUint32(lenBuf, uint32(len(headerBuf))) |
| 203 | x.Check2(w.Write(lenBuf)) |
no test coverage detected