(ctx context.Context, kvs chan *pb.KVS)
| 58 | } |
| 59 | |
| 60 | func batchAndProposeKeyValues(ctx context.Context, kvs chan *pb.KVS) error { |
| 61 | glog.Infoln("Receiving predicate. Batching and proposing key values") |
| 62 | n := groups().Node |
| 63 | proposal := &pb.Proposal{} |
| 64 | size := 0 |
| 65 | var pk x.ParsedKey |
| 66 | |
| 67 | for kvPayload := range kvs { |
| 68 | buf := z.NewBufferSlice(kvPayload.GetData()) |
| 69 | err := buf.SliceIterate(func(s []byte) error { |
| 70 | kv := &bpb.KV{} |
| 71 | x.Check(proto.Unmarshal(s, kv)) |
| 72 | if len(pk.Attr) == 0 { |
| 73 | // This only happens once. |
| 74 | var err error |
| 75 | pk, err = x.Parse(kv.Key) |
| 76 | if err != nil { |
| 77 | return errors.Errorf("while parsing kv: %+v, got error: %v", kv, err) |
| 78 | } |
| 79 | |
| 80 | if !pk.IsSchema() { |
| 81 | return errors.Errorf("Expecting first key to be schema key: %+v", kv) |
| 82 | } |
| 83 | |
| 84 | // Delete on all nodes. Remove the schema at timestamp kv.Version-1 and set it at |
| 85 | // kv.Version. kv.Version will be the TxnTs of the predicate move. |
| 86 | p := &pb.Proposal{CleanPredicate: pk.Attr, StartTs: kv.Version - 1} |
| 87 | glog.Infof("Predicate being received: %v", pk.Attr) |
| 88 | if err := n.proposeAndWait(ctx, p); err != nil { |
| 89 | glog.Errorf("Error while cleaning predicate %v %v\n", pk.Attr, err) |
| 90 | return err |
| 91 | } |
| 92 | } |
| 93 | |
| 94 | proposal.Kv = append(proposal.Kv, kv) |
| 95 | size += len(kv.Key) + len(kv.Value) |
| 96 | if size >= 32<<20 { // 32 MB |
| 97 | if err := n.proposeAndWait(ctx, proposal); err != nil { |
| 98 | return err |
| 99 | } |
| 100 | proposal = &pb.Proposal{} |
| 101 | size = 0 |
| 102 | } |
| 103 | return nil |
| 104 | }) |
| 105 | if err != nil { |
| 106 | return err |
| 107 | } |
| 108 | } |
| 109 | if size > 0 { |
| 110 | // Propose remaining keys. |
| 111 | if err := n.proposeAndWait(ctx, proposal); err != nil { |
| 112 | return err |
| 113 | } |
| 114 | } |
| 115 | return nil |
| 116 | } |
| 117 |
no test coverage detected