| 100 | } |
| 101 | |
| 102 | func (c *countIndexer) writeIndex(buf *z.Buffer) { |
| 103 | defer func() { |
| 104 | c.wg.Done() |
| 105 | if err := buf.Release(); err != nil { |
| 106 | glog.Warningf("error in releasing buffer: %v", err) |
| 107 | } |
| 108 | |
| 109 | }() |
| 110 | if buf.IsEmpty() { |
| 111 | return |
| 112 | } |
| 113 | |
| 114 | streamId := atomic.AddUint32(&c.streamId, 1) |
| 115 | buf.SortSlice(func(ls, rs []byte) bool { |
| 116 | left := countEntry(ls) |
| 117 | right := countEntry(rs) |
| 118 | return left.less(right) |
| 119 | }) |
| 120 | |
| 121 | tmp, _ := buf.Slice(buf.StartOffset()) |
| 122 | lastCe := countEntry(tmp) |
| 123 | { |
| 124 | pk, err := x.Parse(lastCe.Key()) |
| 125 | x.Check(err) |
| 126 | fmt.Printf("Writing count index for %q rev=%v\n", pk.Attr, pk.IsReverse()) |
| 127 | } |
| 128 | |
| 129 | alloc := z.NewAllocator(8<<20, "CountIndexer.WriteIndex") |
| 130 | defer alloc.Release() |
| 131 | |
| 132 | var pl pb.PostingList |
| 133 | encoder := codec.Encoder{BlockSize: 256, Alloc: alloc} |
| 134 | |
| 135 | outBuf := z.NewBuffer(5<<20, "CountIndexer.Buffer.WriteIndex") |
| 136 | defer func() { |
| 137 | if err := outBuf.Release(); err != nil { |
| 138 | glog.Warningf("error in releasing buffer: %v", err) |
| 139 | } |
| 140 | }() |
| 141 | encode := func() { |
| 142 | pl.Pack = encoder.Done() |
| 143 | if codec.ExactLen(pl.Pack) == 0 { |
| 144 | return |
| 145 | } |
| 146 | |
| 147 | kv := posting.MarshalPostingList(&pl, nil) |
| 148 | kv.Key = append([]byte{}, lastCe.Key()...) |
| 149 | kv.Version = c.state.writeTs |
| 150 | kv.StreamId = streamId |
| 151 | badger.KVToBuffer(kv, outBuf) |
| 152 | |
| 153 | alloc.Reset() |
| 154 | encoder = codec.Encoder{BlockSize: 256, Alloc: alloc} |
| 155 | pl.Reset() |
| 156 | |
| 157 | // flush out the buffer. |
| 158 | if outBuf.LenNoPadding() > 4<<20 { |
| 159 | x.Check(c.writer.Write(outBuf)) |