MutateOverNetwork checks which group should be running the mutations according to the group config and sends it to that instance.
(ctx context.Context, m *pb.Mutations)
| 753 | // MutateOverNetwork checks which group should be running the mutations |
| 754 | // according to the group config and sends it to that instance. |
| 755 | func MutateOverNetwork(ctx context.Context, m *pb.Mutations) (*api.TxnContext, error) { |
| 756 | ctx, span := otel.Tracer("").Start(ctx, "worker.MutateOverNetwork") |
| 757 | defer span.End() |
| 758 | |
| 759 | tctx := &api.TxnContext{StartTs: m.StartTs} |
| 760 | if err := verifyTypes(ctx, m); err != nil { |
| 761 | return tctx, err |
| 762 | } |
| 763 | mutationMap, err := populateMutationMap(m) |
| 764 | if err != nil { |
| 765 | return tctx, err |
| 766 | } |
| 767 | |
| 768 | resCh := make(chan res, len(mutationMap)) |
| 769 | for gid, mu := range mutationMap { |
| 770 | if gid == 0 { |
| 771 | span.AddEvent("State information", trace.WithAttributes( |
| 772 | attribute.String("state", groups().state.String()))) |
| 773 | span.AddEvent("Group id zero for mutation", trace.WithAttributes( |
| 774 | attribute.String("mutation", mu.String()))) |
| 775 | return tctx, errNonExistentTablet |
| 776 | } |
| 777 | mu.StartTs = m.StartTs |
| 778 | go proposeOrSend(ctx, gid, mu, resCh) |
| 779 | } |
| 780 | |
| 781 | // Wait for all the goroutines to reply back. |
| 782 | // We return if an error was returned or the parent called ctx.Done() |
| 783 | var e error |
| 784 | for range mutationMap { |
| 785 | res := <-resCh |
| 786 | if res.err != nil { |
| 787 | e = res.err |
| 788 | } |
| 789 | if res.ctx != nil { |
| 790 | tctx.Keys = append(tctx.Keys, res.ctx.Keys...) |
| 791 | tctx.Preds = append(tctx.Preds, res.ctx.Preds...) |
| 792 | } |
| 793 | } |
| 794 | close(resCh) |
| 795 | return tctx, e |
| 796 | } |
| 797 | |
| 798 | func verifyTypes(ctx context.Context, m *pb.Mutations) error { |
| 799 | // Create a set of all the predicates included in this schema request. |
no test coverage detected