MCPcopy
hub / github.com/dgraph-io/dgraph / batchAndProposeKeyValues

Function batchAndProposeKeyValues

worker/predicate_move.go:60–116  ·  view source on GitHub ↗
(ctx context.Context, kvs chan *pb.KVS)

Source from the content-addressed store, hash-verified

58}
59
60func batchAndProposeKeyValues(ctx context.Context, kvs chan *pb.KVS) error {
61 glog.Infoln("Receiving predicate. Batching and proposing key values")
62 n := groups().Node
63 proposal := &pb.Proposal{}
64 size := 0
65 var pk x.ParsedKey
66
67 for kvPayload := range kvs {
68 buf := z.NewBufferSlice(kvPayload.GetData())
69 err := buf.SliceIterate(func(s []byte) error {
70 kv := &bpb.KV{}
71 x.Check(proto.Unmarshal(s, kv))
72 if len(pk.Attr) == 0 {
73 // This only happens once.
74 var err error
75 pk, err = x.Parse(kv.Key)
76 if err != nil {
77 return errors.Errorf("while parsing kv: %+v, got error: %v", kv, err)
78 }
79
80 if !pk.IsSchema() {
81 return errors.Errorf("Expecting first key to be schema key: %+v", kv)
82 }
83
84 // Delete on all nodes. Remove the schema at timestamp kv.Version-1 and set it at
85 // kv.Version. kv.Version will be the TxnTs of the predicate move.
86 p := &pb.Proposal{CleanPredicate: pk.Attr, StartTs: kv.Version - 1}
87 glog.Infof("Predicate being received: %v", pk.Attr)
88 if err := n.proposeAndWait(ctx, p); err != nil {
89 glog.Errorf("Error while cleaning predicate %v %v\n", pk.Attr, err)
90 return err
91 }
92 }
93
94 proposal.Kv = append(proposal.Kv, kv)
95 size += len(kv.Key) + len(kv.Value)
96 if size >= 32<<20 { // 32 MB
97 if err := n.proposeAndWait(ctx, proposal); err != nil {
98 return err
99 }
100 proposal = &pb.Proposal{}
101 size = 0
102 }
103 return nil
104 })
105 if err != nil {
106 return err
107 }
108 }
109 if size > 0 {
110 // Propose remaining keys.
111 if err := n.proposeAndWait(ctx, proposal); err != nil {
112 return err
113 }
114 }
115 return nil
116}
117

Callers 1

ReceivePredicateMethod · 0.85

Calls 8

IsSchemaMethod · 0.95
CheckFunction · 0.92
ParseFunction · 0.92
groupsFunction · 0.85
GetDataMethod · 0.80
InfofMethod · 0.80
ErrorfMethod · 0.45
proposeAndWaitMethod · 0.45

Tested by

no test coverage detected