(wg *sync.WaitGroup)
| 22 | ) |
| 23 | |
| 24 | func (n *Node) run(wg *sync.WaitGroup) { |
| 25 | ticker := time.Tick(20 * time.Millisecond) |
| 26 | |
| 27 | for { |
| 28 | select { |
| 29 | case <-ticker: |
| 30 | n.Raft().Tick() |
| 31 | case rd := <-n.Raft().Ready(): |
| 32 | n.SaveToStorage(&rd.HardState, rd.Entries, &rd.Snapshot) |
| 33 | for _, entry := range rd.CommittedEntries { |
| 34 | if entry.Type == raftpb.EntryConfChange { |
| 35 | var cc raftpb.ConfChange |
| 36 | if err := cc.Unmarshal(entry.Data); err != nil { |
| 37 | fmt.Printf("error in unmarshalling: %v\n", err) |
| 38 | } |
| 39 | n.Raft().ApplyConfChange(cc) |
| 40 | } else if entry.Type == raftpb.EntryNormal { |
| 41 | if bytes.HasPrefix(entry.Data, []byte("hey")) { |
| 42 | wg.Done() |
| 43 | } |
| 44 | } |
| 45 | } |
| 46 | n.Raft().Advance() |
| 47 | } |
| 48 | } |
| 49 | } |
| 50 | |
| 51 | func TestProposal(t *testing.T) { |
| 52 | dir := t.TempDir() |
no test coverage detected