(buf *z.Buffer)
| 162 | } |
| 163 | |
| 164 | func (m *mapper) writeToDisk(buf *z.Buffer) error { |
| 165 | defer func() { |
| 166 | if err := buf.Release(); err != nil { |
| 167 | glog.Warningf("error in releasing buffer: %v", err) |
| 168 | } |
| 169 | }() |
| 170 | |
| 171 | if buf.IsEmpty() { |
| 172 | return nil |
| 173 | } |
| 174 | |
| 175 | f, err := m.newMapFile() |
| 176 | if err != nil { |
| 177 | return errors.Wrap(err, "openOutputFile") |
| 178 | } |
| 179 | defer func() { |
| 180 | if err := f.Close(); err != nil { |
| 181 | glog.Warningf("error while closing fd: %v", err) |
| 182 | } |
| 183 | }() |
| 184 | |
| 185 | // Create partition keys for the map file. |
| 186 | header := &pb.MapHeader{PartitionKeys: [][]byte{}} |
| 187 | var bufSize int |
| 188 | err = buf.SliceIterate(func(slice []byte) error { |
| 189 | bufSize += 4 + len(slice) |
| 190 | if bufSize < partitionBufSz { |
| 191 | return nil |
| 192 | } |
| 193 | sz := len(header.PartitionKeys) |
| 194 | me := mapEntry(slice) |
| 195 | if sz > 0 && bytes.Equal(me.Key(), header.PartitionKeys[sz-1]) { |
| 196 | // We already have this key. |
| 197 | return nil |
| 198 | } |
| 199 | header.PartitionKeys = append(header.PartitionKeys, me.Key()) |
| 200 | bufSize = 0 |
| 201 | return nil |
| 202 | }) |
| 203 | if err != nil { |
| 204 | glog.Errorf("error in iterating over buffer: %v", err) |
| 205 | return err |
| 206 | } |
| 207 | |
| 208 | // Write the header to the map file. |
| 209 | headerBuf, err := proto.Marshal(header) |
| 210 | x.Check(err) |
| 211 | var lenBuf [4]byte |
| 212 | binary.BigEndian.PutUint32(lenBuf[:], uint32(len(headerBuf))) |
| 213 | |
| 214 | w := s2.NewWriter(f) |
| 215 | x.Check2(w.Write(lenBuf[:])) |
| 216 | x.Check2(w.Write(headerBuf)) |
| 217 | x.Check(err) |
| 218 | |
| 219 | sizeBuf := make([]byte, binary.MaxVarintLen64) |
| 220 | err = buf.SliceIterate(func(slice []byte) error { |
| 221 | n := binary.PutUvarint(sizeBuf, uint64(len(slice))) |
no test coverage detected