From 32c38820c17f564b4699f3c8c98b968492fd074d Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Sun, 12 Oct 2014 00:34:22 -0700 Subject: [PATCH] raft: protobuf messageType --- raft/node.go | 10 +-- raft/node_test.go | 16 ++--- raft/raft.go | 94 +++++++++---------------- raft/raft_test.go | 154 ++++++++++++++++++++--------------------- raft/raftpb/raft.pb.go | 79 +++++++++++++++++---- raft/raftpb/raft.proto | 32 ++++++--- 6 files changed, 211 insertions(+), 174 deletions(-) diff --git a/raft/node.go b/raft/node.go index ccfd287f1..329bc509a 100644 --- a/raft/node.go +++ b/raft/node.go @@ -264,16 +264,16 @@ func (n *node) Tick() { } func (n *node) Campaign(ctx context.Context) error { - return n.step(ctx, pb.Message{Type: msgHup}) + return n.step(ctx, pb.Message{Type: pb.MsgHup}) } func (n *node) Propose(ctx context.Context, data []byte) error { - return n.step(ctx, pb.Message{Type: msgProp, Entries: []pb.Entry{{Data: data}}}) + return n.step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}}) } func (n *node) Step(ctx context.Context, m pb.Message) error { // ignore unexpected local messages receiving over network - if m.Type == msgHup || m.Type == msgBeat { + if m.Type == pb.MsgHup || m.Type == pb.MsgBeat { // TODO: return an error? return nil } @@ -285,14 +285,14 @@ func (n *node) ProposeConfChange(ctx context.Context, cc pb.ConfChange) error { if err != nil { return err } - return n.Step(ctx, pb.Message{Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange, Data: data}}}) + return n.Step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange, Data: data}}}) } // Step advances the state machine using msgs. The ctx.Err() will be returned, // if any. func (n *node) step(ctx context.Context, m pb.Message) error { ch := n.recvc - if m.Type == msgProp { + if m.Type == pb.MsgProp { ch = n.propc } diff --git a/raft/node_test.go b/raft/node_test.go index ba608076e..72b95ddaa 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -13,32 +13,32 @@ import ( // TestNodeStep ensures that node.Step sends msgProp to propc chan // and other kinds of messages to recvc chan. func TestNodeStep(t *testing.T) { - for i := range mtmap { + for i, msgn := range raftpb.MessageType_name { n := &node{ propc: make(chan raftpb.Message, 1), recvc: make(chan raftpb.Message, 1), } - msgt := uint64(i) + msgt := raftpb.MessageType(i) n.Step(context.TODO(), raftpb.Message{Type: msgt}) // Proposal goes to proc chan. Others go to recvc chan. - if uint64(i) == msgProp { + if msgt == raftpb.MsgProp { select { case <-n.propc: default: - t.Errorf("%d: cannot receive %s on propc chan", i, mtmap[i]) + t.Errorf("%d: cannot receive %s on propc chan", msgt, msgn) } } else { - if msgt == msgBeat || msgt == msgHup { + if msgt == raftpb.MsgBeat || msgt == raftpb.MsgHup { select { case <-n.recvc: - t.Errorf("%d: step should ignore %s", i, mtmap[i]) + t.Errorf("%d: step should ignore %s", msgt, msgn) default: } } else { select { case <-n.recvc: default: - t.Errorf("%d: cannot receive %s on recvc chan", i, mtmap[i]) + t.Errorf("%d: cannot receive %s on recvc chan", msgt, msgn) } } } @@ -67,7 +67,7 @@ func TestNodeStepUnblock(t *testing.T) { for i, tt := range tests { errc := make(chan error, 1) go func() { - err := n.Step(ctx, raftpb.Message{Type: msgProp}) + err := n.Step(ctx, raftpb.Message{Type: raftpb.MsgProp}) errc <- err }() tt.unblock() diff --git a/raft/raft.go b/raft/raft.go index 3532273e0..fdb460bb6 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -12,36 +12,6 @@ import ( // None is a placeholder node ID used when there is no leader. const None uint64 = 0 -type messageType uint64 - -const ( - msgHup uint64 = iota - msgBeat - msgProp - msgApp - msgAppResp - msgVote - msgVoteResp - msgSnap - msgDenied -) - -var mtmap = [...]string{ - "msgHup", - "msgBeat", - "msgProp", - "msgApp", - "msgAppResp", - "msgVote", - "msgVoteResp", - "msgSnap", - "msgDenied", -} - -func (mt messageType) String() string { - return mtmap[uint64(mt)] -} - var errNoLeader = errors.New("no leader") // Possible values for StateType. @@ -192,7 +162,7 @@ func (r *raft) send(m pb.Message) { // do not attach term to msgProp // proposals are a way to forward to the leader and // should be treated as local message. - if m.Type != msgProp { + if m.Type != pb.MsgProp { m.Term = r.Term } r.msgs = append(r.msgs, m) @@ -205,10 +175,10 @@ func (r *raft) sendAppend(to uint64) { m.To = to m.Index = pr.next - 1 if r.needSnapshot(m.Index) { - m.Type = msgSnap + m.Type = pb.MsgSnap m.Snapshot = r.raftLog.snapshot } else { - m.Type = msgApp + m.Type = pb.MsgApp m.LogTerm = r.raftLog.term(pr.next - 1) m.Entries = r.raftLog.entries(pr.next) m.Commit = r.raftLog.committed @@ -220,7 +190,7 @@ func (r *raft) sendAppend(to uint64) { func (r *raft) sendHeartbeat(to uint64) { m := pb.Message{ To: to, - Type: msgApp, + Type: pb.MsgApp, } r.send(m) } @@ -293,7 +263,7 @@ func (r *raft) tickElection() { r.elapsed++ if r.isElectionTimeout() { r.elapsed = 0 - r.Step(pb.Message{From: r.id, Type: msgHup}) + r.Step(pb.Message{From: r.id, Type: pb.MsgHup}) } } @@ -302,7 +272,7 @@ func (r *raft) tickHeartbeat() { r.elapsed++ if r.elapsed > r.heartbeatTimeout { r.elapsed = 0 - r.Step(pb.Message{From: r.id, Type: msgBeat}) + r.Step(pb.Message{From: r.id, Type: pb.MsgBeat}) } } @@ -365,7 +335,7 @@ func (r *raft) campaign() { continue } lasti := r.raftLog.lastIndex() - r.send(pb.Message{To: i, Type: msgVote, Index: lasti, LogTerm: r.raftLog.term(lasti)}) + r.send(pb.Message{To: i, Type: pb.MsgVote, Index: lasti, LogTerm: r.raftLog.term(lasti)}) } } @@ -375,18 +345,18 @@ func (r *raft) Step(m pb.Message) error { if r.removed[m.From] { if m.From != r.id { - r.send(pb.Message{To: m.From, Type: msgDenied}) + r.send(pb.Message{To: m.From, Type: pb.MsgDenied}) } // TODO: return an error? return nil } - if m.Type == msgDenied { + if m.Type == pb.MsgDenied { r.removed[r.id] = true // TODO: return an error? return nil } - if m.Type == msgHup { + if m.Type == pb.MsgHup { r.campaign() } @@ -395,7 +365,7 @@ func (r *raft) Step(m pb.Message) error { // local message case m.Term > r.Term: lead := m.From - if m.Type == msgVote { + if m.Type == pb.MsgVote { lead = None } r.becomeFollower(m.Term, lead) @@ -409,17 +379,17 @@ func (r *raft) Step(m pb.Message) error { func (r *raft) handleAppendEntries(m pb.Message) { if r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...) { - r.send(pb.Message{To: m.From, Type: msgAppResp, Index: r.raftLog.lastIndex()}) + r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()}) } else { - r.send(pb.Message{To: m.From, Type: msgAppResp, Index: m.Index, Reject: true}) + r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true}) } } func (r *raft) handleSnapshot(m pb.Message) { if r.restore(m.Snapshot) { - r.send(pb.Message{To: m.From, Type: msgAppResp, Index: r.raftLog.lastIndex()}) + r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()}) } else { - r.send(pb.Message{To: m.From, Type: msgAppResp, Index: r.raftLog.committed}) + r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed}) } } @@ -438,9 +408,9 @@ type stepFunc func(r *raft, m pb.Message) func stepLeader(r *raft, m pb.Message) { switch m.Type { - case msgBeat: + case pb.MsgBeat: r.bcastHeartbeat() - case msgProp: + case pb.MsgProp: if len(m.Entries) != 1 { panic("unexpected length(entries) of a msgProp") } @@ -453,7 +423,7 @@ func stepLeader(r *raft, m pb.Message) { } r.appendEntry(e) r.bcastAppend() - case msgAppResp: + case pb.MsgAppResp: if m.Reject { if r.prs[m.From].maybeDecrTo(m.Index) { r.sendAppend(m.From) @@ -464,24 +434,24 @@ func stepLeader(r *raft, m pb.Message) { r.bcastAppend() } } - case msgVote: - r.send(pb.Message{To: m.From, Type: msgVoteResp, Reject: true}) + case pb.MsgVote: + r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true}) } } func stepCandidate(r *raft, m pb.Message) { switch m.Type { - case msgProp: + case pb.MsgProp: panic("no leader") - case msgApp: + case pb.MsgApp: r.becomeFollower(r.Term, m.From) r.handleAppendEntries(m) - case msgSnap: + case pb.MsgSnap: r.becomeFollower(m.Term, m.From) r.handleSnapshot(m) - case msgVote: - r.send(pb.Message{To: m.From, Type: msgVoteResp, Reject: true}) - case msgVoteResp: + case pb.MsgVote: + r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true}) + case pb.MsgVoteResp: gr := r.poll(m.From, !m.Reject) switch r.q() { case gr: @@ -495,26 +465,26 @@ func stepCandidate(r *raft, m pb.Message) { func stepFollower(r *raft, m pb.Message) { switch m.Type { - case msgProp: + case pb.MsgProp: if r.lead == None { panic("no leader") } m.To = r.lead r.send(m) - case msgApp: + case pb.MsgApp: r.elapsed = 0 r.lead = m.From r.handleAppendEntries(m) - case msgSnap: + case pb.MsgSnap: r.elapsed = 0 r.handleSnapshot(m) - case msgVote: + case pb.MsgVote: if (r.Vote == None || r.Vote == m.From) && r.raftLog.isUpToDate(m.Index, m.LogTerm) { r.elapsed = 0 r.Vote = m.From - r.send(pb.Message{To: m.From, Type: msgVoteResp}) + r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp}) } else { - r.send(pb.Message{To: m.From, Type: msgVoteResp, Reject: true}) + r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true}) } } } diff --git a/raft/raft_test.go b/raft/raft_test.go index bd94e656d..328f678cd 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -43,7 +43,7 @@ func TestLeaderElection(t *testing.T) { } for i, tt := range tests { - tt.send(pb.Message{From: 1, To: 1, Type: msgHup}) + tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) sm := tt.network.peers[1].(*raft) if sm.state != tt.state { t.Errorf("#%d: state = %s, want %s", i, sm.state, tt.state) @@ -63,23 +63,23 @@ func TestLogReplication(t *testing.T) { { newNetwork(nil, nil, nil), []pb.Message{ - {From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}, + {From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}, }, 2, }, { newNetwork(nil, nil, nil), []pb.Message{ - {From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}, - {From: 1, To: 2, Type: msgHup}, - {From: 1, To: 2, Type: msgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}, + {From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}, + {From: 1, To: 2, Type: pb.MsgHup}, + {From: 1, To: 2, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}, }, 4, }, } for i, tt := range tests { - tt.send(pb.Message{From: 1, To: 1, Type: msgHup}) + tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) for _, m := range tt.msgs { tt.send(m) @@ -100,7 +100,7 @@ func TestLogReplication(t *testing.T) { } props := []pb.Message{} for _, m := range tt.msgs { - if m.Type == msgProp { + if m.Type == pb.MsgProp { props = append(props, m) } } @@ -115,9 +115,9 @@ func TestLogReplication(t *testing.T) { func TestSingleNodeCommit(t *testing.T) { tt := newNetwork(nil) - tt.send(pb.Message{From: 1, To: 1, Type: msgHup}) - tt.send(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Data: []byte("some data")}}}) - tt.send(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Data: []byte("some data")}}}) + tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) + tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}}) + tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}}) sm := tt.peers[1].(*raft) if sm.raftLog.committed != 3 { @@ -130,15 +130,15 @@ func TestSingleNodeCommit(t *testing.T) { // filtered. func TestCannotCommitWithoutNewTermEntry(t *testing.T) { tt := newNetwork(nil, nil, nil, nil, nil) - tt.send(pb.Message{From: 1, To: 1, Type: msgHup}) + tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) // 0 cannot reach 2,3,4 tt.cut(1, 3) tt.cut(1, 4) tt.cut(1, 5) - tt.send(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Data: []byte("some data")}}}) - tt.send(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Data: []byte("some data")}}}) + tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}}) + tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}}) sm := tt.peers[1].(*raft) if sm.raftLog.committed != 1 { @@ -148,10 +148,10 @@ func TestCannotCommitWithoutNewTermEntry(t *testing.T) { // network recovery tt.recover() // avoid committing ChangeTerm proposal - tt.ignore(msgApp) + tt.ignore(pb.MsgApp) // elect 1 as the new leader with term 2 - tt.send(pb.Message{From: 2, To: 2, Type: msgHup}) + tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup}) // no log entries from previous term should be committed sm = tt.peers[2].(*raft) @@ -162,7 +162,7 @@ func TestCannotCommitWithoutNewTermEntry(t *testing.T) { tt.recover() // still be able to append a entry - tt.send(pb.Message{From: 2, To: 2, Type: msgProp, Entries: []pb.Entry{{Data: []byte("some data")}}}) + tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}}) if sm.raftLog.committed != 5 { t.Errorf("committed = %d, want %d", sm.raftLog.committed, 5) @@ -173,15 +173,15 @@ func TestCannotCommitWithoutNewTermEntry(t *testing.T) { // when leader changes, no new proposal comes in. func TestCommitWithoutNewTermEntry(t *testing.T) { tt := newNetwork(nil, nil, nil, nil, nil) - tt.send(pb.Message{From: 1, To: 1, Type: msgHup}) + tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) // 0 cannot reach 2,3,4 tt.cut(1, 3) tt.cut(1, 4) tt.cut(1, 5) - tt.send(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Data: []byte("some data")}}}) - tt.send(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Data: []byte("some data")}}}) + tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}}) + tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}}) sm := tt.peers[1].(*raft) if sm.raftLog.committed != 1 { @@ -194,7 +194,7 @@ func TestCommitWithoutNewTermEntry(t *testing.T) { // elect 1 as the new leader with term 2 // after append a ChangeTerm entry from the current term, all entries // should be committed - tt.send(pb.Message{From: 2, To: 2, Type: msgHup}) + tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup}) if sm.raftLog.committed != 4 { t.Errorf("committed = %d, want %d", sm.raftLog.committed, 4) @@ -209,11 +209,11 @@ func TestDuelingCandidates(t *testing.T) { nt := newNetwork(a, b, c) nt.cut(1, 3) - nt.send(pb.Message{From: 1, To: 1, Type: msgHup}) - nt.send(pb.Message{From: 3, To: 3, Type: msgHup}) + nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) + nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup}) nt.recover() - nt.send(pb.Message{From: 3, To: 3, Type: msgHup}) + nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup}) wlog := &raftLog{ents: []pb.Entry{{}, pb.Entry{Data: nil, Term: 1, Index: 1}}, committed: 1} tests := []struct { @@ -250,15 +250,15 @@ func TestCandidateConcede(t *testing.T) { tt := newNetwork(nil, nil, nil) tt.isolate(1) - tt.send(pb.Message{From: 1, To: 1, Type: msgHup}) - tt.send(pb.Message{From: 3, To: 3, Type: msgHup}) + tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) + tt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup}) // heal the partition tt.recover() data := []byte("force follower") // send a proposal to 2 to flush out a msgApp to 0 - tt.send(pb.Message{From: 3, To: 3, Type: msgProp, Entries: []pb.Entry{{Data: data}}}) + tt.send(pb.Message{From: 3, To: 3, Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}}) a := tt.peers[1].(*raft) if g := a.state; g != StateFollower { @@ -282,7 +282,7 @@ func TestCandidateConcede(t *testing.T) { func TestSingleNodeCandidate(t *testing.T) { tt := newNetwork(nil) - tt.send(pb.Message{From: 1, To: 1, Type: msgHup}) + tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) sm := tt.peers[1].(*raft) if sm.state != StateLeader { @@ -293,11 +293,11 @@ func TestSingleNodeCandidate(t *testing.T) { func TestOldMessages(t *testing.T) { tt := newNetwork(nil, nil, nil) // make 0 leader @ term 3 - tt.send(pb.Message{From: 1, To: 1, Type: msgHup}) - tt.send(pb.Message{From: 2, To: 2, Type: msgHup}) - tt.send(pb.Message{From: 1, To: 1, Type: msgHup}) + tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) + tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup}) + tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) // pretend we're an old leader trying to make progress - tt.send(pb.Message{From: 1, To: 1, Type: msgApp, Term: 1, Entries: []pb.Entry{{Term: 1}}}) + tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgApp, Term: 1, Entries: []pb.Entry{{Term: 1}}}) l := &raftLog{ ents: []pb.Entry{ @@ -351,8 +351,8 @@ func TestProposal(t *testing.T) { data := []byte("somedata") // promote 0 the leader - send(pb.Message{From: 1, To: 1, Type: msgHup}) - send(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Data: data}}}) + send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) + send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}}) wantLog := newLog() if tt.success { @@ -385,10 +385,10 @@ func TestProposalByProxy(t *testing.T) { for i, tt := range tests { // promote 0 the leader - tt.send(pb.Message{From: 1, To: 1, Type: msgHup}) + tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) // propose via follower - tt.send(pb.Message{From: 2, To: 2, Type: msgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) + tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) wantLog := &raftLog{ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Data: data, Index: 2}}, committed: 2} base := ltoa(wantLog) @@ -545,7 +545,7 @@ func TestStepIgnoreOldTermMsg(t *testing.T) { sm := newRaft(1, []uint64{1}, 10, 1) sm.step = fakeStep sm.Term = 2 - sm.Step(pb.Message{Type: msgApp, Term: sm.Term - 1}) + sm.Step(pb.Message{Type: pb.MsgApp, Term: sm.Term - 1}) if called == true { t.Errorf("stepFunc called = %v , want %v", called, false) } @@ -564,21 +564,21 @@ func TestHandleMsgApp(t *testing.T) { wReject bool }{ // Ensure 1 - {pb.Message{Type: msgApp, Term: 2, LogTerm: 3, Index: 2, Commit: 3}, 2, 0, true}, // previous log mismatch - {pb.Message{Type: msgApp, Term: 2, LogTerm: 3, Index: 3, Commit: 3}, 2, 0, true}, // previous log non-exist + {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 3, Index: 2, Commit: 3}, 2, 0, true}, // previous log mismatch + {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 3, Index: 3, Commit: 3}, 2, 0, true}, // previous log non-exist // Ensure 2 - {pb.Message{Type: msgApp, Term: 2, LogTerm: 1, Index: 1, Commit: 1}, 2, 1, false}, - {pb.Message{Type: msgApp, Term: 2, LogTerm: 0, Index: 0, Commit: 1, Entries: []pb.Entry{{Term: 2}}}, 1, 1, false}, - {pb.Message{Type: msgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 3, Entries: []pb.Entry{{Term: 2}, {Term: 2}}}, 4, 3, false}, - {pb.Message{Type: msgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 4, Entries: []pb.Entry{{Term: 2}}}, 3, 3, false}, - {pb.Message{Type: msgApp, Term: 2, LogTerm: 1, Index: 1, Commit: 4, Entries: []pb.Entry{{Term: 2}}}, 2, 2, false}, + {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 1, Index: 1, Commit: 1}, 2, 1, false}, + {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 0, Index: 0, Commit: 1, Entries: []pb.Entry{{Term: 2}}}, 1, 1, false}, + {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 3, Entries: []pb.Entry{{Term: 2}, {Term: 2}}}, 4, 3, false}, + {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 4, Entries: []pb.Entry{{Term: 2}}}, 3, 3, false}, + {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 1, Index: 1, Commit: 4, Entries: []pb.Entry{{Term: 2}}}, 2, 2, false}, // Ensure 3 - {pb.Message{Type: msgApp, Term: 1, LogTerm: 1, Index: 1, Commit: 3}, 2, 1, false}, // match entry 1, commit upto last new entry 1 - {pb.Message{Type: msgApp, Term: 1, LogTerm: 1, Index: 1, Commit: 3, Entries: []pb.Entry{{Term: 2}}}, 2, 2, false}, // match entry 1, commit upto last new entry 2 - {pb.Message{Type: msgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 3}, 2, 2, false}, // match entry 2, commit upto last new entry 2 - {pb.Message{Type: msgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 4}, 2, 2, false}, // commit upto log.last() + {pb.Message{Type: pb.MsgApp, Term: 1, LogTerm: 1, Index: 1, Commit: 3}, 2, 1, false}, // match entry 1, commit upto last new entry 1 + {pb.Message{Type: pb.MsgApp, Term: 1, LogTerm: 1, Index: 1, Commit: 3, Entries: []pb.Entry{{Term: 2}}}, 2, 2, false}, // match entry 1, commit upto last new entry 2 + {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 3}, 2, 2, false}, // match entry 2, commit upto last new entry 2 + {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 4}, 2, 2, false}, // commit upto log.last() } for i, tt := range tests { @@ -653,7 +653,7 @@ func TestRecvMsgVote(t *testing.T) { sm.HardState = pb.HardState{Vote: tt.voteFor} sm.raftLog = &raftLog{ents: []pb.Entry{{}, {Term: 2}, {Term: 2}}} - sm.Step(pb.Message{Type: msgVote, From: 2, Index: tt.i, LogTerm: tt.term}) + sm.Step(pb.Message{Type: pb.MsgVote, From: 2, Index: tt.i, LogTerm: tt.term}) msgs := sm.ReadMessages() if g := len(msgs); g != 1 { @@ -732,7 +732,7 @@ func TestAllServerStepdown(t *testing.T) { {StateLeader, StateFollower, 3, 2}, } - tmsgTypes := [...]uint64{msgVote, msgApp} + tmsgTypes := [...]pb.MessageType{pb.MsgVote, pb.MsgApp} tterm := uint64(3) for i, tt := range tests { @@ -760,7 +760,7 @@ func TestAllServerStepdown(t *testing.T) { t.Errorf("#%d.%d index = %v , want %v", i, j, len(sm.raftLog.ents), tt.windex) } wlead := uint64(2) - if msgType == msgVote { + if msgType == pb.MsgVote { wlead = None } if sm.lead != wlead { @@ -791,7 +791,7 @@ func TestLeaderAppResp(t *testing.T) { sm.becomeCandidate() sm.becomeLeader() sm.ReadMessages() - sm.Step(pb.Message{From: 2, Type: msgAppResp, Index: tt.index, Term: sm.Term, Reject: tt.reject}) + sm.Step(pb.Message{From: 2, Type: pb.MsgAppResp, Index: tt.index, Term: sm.Term, Reject: tt.reject}) msgs := sm.ReadMessages() if len(msgs) != tt.wmsgNum { @@ -828,15 +828,15 @@ func TestBcastBeat(t *testing.T) { sm.appendEntry(pb.Entry{}) } - sm.Step(pb.Message{Type: msgBeat}) + sm.Step(pb.Message{Type: pb.MsgBeat}) msgs := sm.ReadMessages() if len(msgs) != 2 { t.Fatalf("len(msgs) = %v, want 1", len(msgs)) } tomap := map[uint64]bool{2: true, 3: true} for i, m := range msgs { - if m.Type != msgApp { - t.Fatalf("#%d: type = %v, want = %v", i, m.Type, msgApp) + if m.Type != pb.MsgApp { + t.Fatalf("#%d: type = %v, want = %v", i, m.Type, pb.MsgApp) } if m.Index != 0 { t.Fatalf("#%d: prevIndex = %d, want %d", i, m.Index, 0) @@ -880,15 +880,15 @@ func TestRecvMsgBeat(t *testing.T) { case StateLeader: sm.step = stepLeader } - sm.Step(pb.Message{From: 1, To: 1, Type: msgBeat}) + sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) msgs := sm.ReadMessages() if len(msgs) != tt.wMsg { t.Errorf("%d: len(msgs) = %d, want %d", i, len(msgs), tt.wMsg) } for _, m := range msgs { - if m.Type != msgApp { - t.Errorf("%d: msg.type = %v, want %v", i, m.Type, msgApp) + if m.Type != pb.MsgApp { + t.Errorf("%d: msg.type = %v, want %v", i, m.Type, pb.MsgApp) } } } @@ -950,14 +950,14 @@ func TestProvideSnap(t *testing.T) { // node 1 needs a snapshot sm.prs[2].next = sm.raftLog.offset - sm.Step(pb.Message{From: 2, To: 1, Type: msgAppResp, Index: sm.prs[2].next - 1, Reject: true}) + sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs[2].next - 1, Reject: true}) msgs := sm.ReadMessages() if len(msgs) != 1 { t.Fatalf("len(msgs) = %d, want 1", len(msgs)) } m := msgs[0] - if m.Type != msgSnap { - t.Errorf("m.Type = %v, want %v", m.Type, msgSnap) + if m.Type != pb.MsgSnap { + t.Errorf("m.Type = %v, want %v", m.Type, pb.MsgSnap) } } @@ -967,7 +967,7 @@ func TestRestoreFromSnapMsg(t *testing.T) { Term: defaultCompactThreshold + 1, Nodes: []uint64{1, 2}, } - m := pb.Message{Type: msgSnap, From: 1, Term: 2, Snapshot: s} + m := pb.Message{Type: pb.MsgSnap, From: 1, Term: 2, Snapshot: s} sm := newRaft(2, []uint64{1, 2}, 10, 1) sm.Step(m) @@ -979,11 +979,11 @@ func TestRestoreFromSnapMsg(t *testing.T) { func TestSlowNodeRestore(t *testing.T) { nt := newNetwork(nil, nil, nil) - nt.send(pb.Message{From: 1, To: 1, Type: msgHup}) + nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) nt.isolate(3) for j := 0; j < defaultCompactThreshold+1; j++ { - nt.send(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{}}}) + nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) } lead := nt.peers[1].(*raft) nextEnts(lead) @@ -991,14 +991,14 @@ func TestSlowNodeRestore(t *testing.T) { nt.recover() // trigger a snapshot - nt.send(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{}}}) + nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) follower := nt.peers[3].(*raft) if !reflect.DeepEqual(follower.raftLog.snapshot, lead.raftLog.snapshot) { t.Errorf("follower.snap = %+v, want %+v", follower.raftLog.snapshot, lead.raftLog.snapshot) } // trigger a commit - nt.send(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{}}}) + nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) if follower.raftLog.committed != lead.raftLog.committed { t.Errorf("follower.comitted = %d, want %d", follower.raftLog.committed, lead.raftLog.committed) } @@ -1012,7 +1012,7 @@ func TestStepConfig(t *testing.T) { r.becomeCandidate() r.becomeLeader() index := r.raftLog.lastIndex() - r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}}) + r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}}) if g := r.raftLog.lastIndex(); g != index+1 { t.Errorf("index = %d, want %d", g, index+1) } @@ -1029,10 +1029,10 @@ func TestStepIgnoreConfig(t *testing.T) { r := newRaft(1, []uint64{1, 2}, 10, 1) r.becomeCandidate() r.becomeLeader() - r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}}) + r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}}) index := r.raftLog.lastIndex() pendingConf := r.pendingConf - r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}}) + r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}}) if g := r.raftLog.lastIndex(); g != index { t.Errorf("index = %d, want %d", g, index) } @@ -1123,7 +1123,7 @@ func TestRecvMsgDenied(t *testing.T) { } r := newRaft(1, []uint64{1, 2}, 10, 1) r.step = fakeStep - r.Step(pb.Message{From: 2, Type: msgDenied}) + r.Step(pb.Message{From: 2, Type: pb.MsgDenied}) if called != false { t.Errorf("stepFunc called = %v , want %v", called, false) } @@ -1152,7 +1152,7 @@ func TestRecvMsgFromRemovedNode(t *testing.T) { r := newRaft(1, []uint64{1}, 10, 1) r.step = fakeStep r.removeNode(tt.from) - r.Step(pb.Message{From: tt.from, Type: msgVote}) + r.Step(pb.Message{From: tt.from, Type: pb.MsgVote}) if called != false { t.Errorf("#%d: stepFunc called = %v , want %v", i, called, false) } @@ -1160,8 +1160,8 @@ func TestRecvMsgFromRemovedNode(t *testing.T) { t.Errorf("#%d: len(msgs) = %d, want %d", i, len(r.msgs), tt.wmsgNum) } for j, msg := range r.msgs { - if msg.Type != msgDenied { - t.Errorf("#%d.%d: msgType = %d, want %d", i, j, msg.Type, msgDenied) + if msg.Type != pb.MsgDenied { + t.Errorf("#%d.%d: msgType = %d, want %d", i, j, msg.Type, pb.MsgDenied) } } } @@ -1203,7 +1203,7 @@ func ents(terms ...uint64) *raft { type network struct { peers map[uint64]Interface dropm map[connem]float64 - ignorem map[uint64]bool + ignorem map[pb.MessageType]bool } // newNetwork initializes a network from peers. @@ -1242,7 +1242,7 @@ func newNetwork(peers ...Interface) *network { return &network{ peers: npeers, dropm: make(map[connem]float64), - ignorem: make(map[uint64]bool), + ignorem: make(map[pb.MessageType]bool), } } @@ -1274,13 +1274,13 @@ func (nw *network) isolate(id uint64) { } } -func (nw *network) ignore(t uint64) { +func (nw *network) ignore(t pb.MessageType) { nw.ignorem[t] = true } func (nw *network) recover() { nw.dropm = make(map[connem]float64) - nw.ignorem = make(map[uint64]bool) + nw.ignorem = make(map[pb.MessageType]bool) } func (nw *network) filter(msgs []pb.Message) []pb.Message { @@ -1290,7 +1290,7 @@ func (nw *network) filter(msgs []pb.Message) []pb.Message { continue } switch m.Type { - case msgHup: + case pb.MsgHup: // hups never go over the network, so don't drop them but panic panic("unexpected msgHup") default: diff --git a/raft/raftpb/raft.pb.go b/raft/raftpb/raft.pb.go index 79fd902ee..970476fc8 100644 --- a/raft/raftpb/raft.pb.go +++ b/raft/raftpb/raft.pb.go @@ -64,6 +64,60 @@ func (x *EntryType) UnmarshalJSON(data []byte) error { return nil } +type MessageType int32 + +const ( + MsgHup MessageType = 0 + MsgBeat MessageType = 1 + MsgProp MessageType = 2 + MsgApp MessageType = 3 + MsgAppResp MessageType = 4 + MsgVote MessageType = 5 + MsgVoteResp MessageType = 6 + MsgSnap MessageType = 7 + MsgDenied MessageType = 8 +) + +var MessageType_name = map[int32]string{ + 0: "MsgHup", + 1: "MsgBeat", + 2: "MsgProp", + 3: "MsgApp", + 4: "MsgAppResp", + 5: "MsgVote", + 6: "MsgVoteResp", + 7: "MsgSnap", + 8: "MsgDenied", +} +var MessageType_value = map[string]int32{ + "MsgHup": 0, + "MsgBeat": 1, + "MsgProp": 2, + "MsgApp": 3, + "MsgAppResp": 4, + "MsgVote": 5, + "MsgVoteResp": 6, + "MsgSnap": 7, + "MsgDenied": 8, +} + +func (x MessageType) Enum() *MessageType { + p := new(MessageType) + *p = x + return p +} +func (x MessageType) String() string { + return proto.EnumName(MessageType_name, int32(x)) +} +func (x *MessageType) UnmarshalJSON(data []byte) error { + value, err := proto.UnmarshalJSONEnum(MessageType_value, data, "MessageType") + if err != nil { + return err + } + *x = MessageType(value) + return nil +} + type ConfChangeType int32 const ( @@ -123,17 +177,17 @@ func (m *Snapshot) String() string { return proto.CompactTextString(m) } func (*Snapshot) ProtoMessage() {} type Message struct { - Type uint64 `protobuf:"varint,1,req,name=type" json:"type"` - To uint64 `protobuf:"varint,2,req,name=to" json:"to"` - From uint64 `protobuf:"varint,3,req,name=from" json:"from"` - Term uint64 `protobuf:"varint,4,req,name=term" json:"term"` - LogTerm uint64 `protobuf:"varint,5,req,name=logTerm" json:"logTerm"` - Index uint64 `protobuf:"varint,6,req,name=index" json:"index"` - Entries []Entry `protobuf:"bytes,7,rep,name=entries" json:"entries"` - Commit uint64 `protobuf:"varint,8,req,name=commit" json:"commit"` - Snapshot Snapshot `protobuf:"bytes,9,req,name=snapshot" json:"snapshot"` - Reject bool `protobuf:"varint,10,req,name=reject" json:"reject"` - XXX_unrecognized []byte `json:"-"` + Type MessageType `protobuf:"varint,1,req,name=type,enum=raftpb.MessageType" json:"type"` + To uint64 `protobuf:"varint,2,req,name=to" json:"to"` + From uint64 `protobuf:"varint,3,req,name=from" json:"from"` + Term uint64 `protobuf:"varint,4,req,name=term" json:"term"` + LogTerm uint64 `protobuf:"varint,5,req,name=logTerm" json:"logTerm"` + Index uint64 `protobuf:"varint,6,req,name=index" json:"index"` + Entries []Entry `protobuf:"bytes,7,rep,name=entries" json:"entries"` + Commit uint64 `protobuf:"varint,8,req,name=commit" json:"commit"` + Snapshot Snapshot `protobuf:"bytes,9,req,name=snapshot" json:"snapshot"` + Reject bool `protobuf:"varint,10,req,name=reject" json:"reject"` + XXX_unrecognized []byte `json:"-"` } func (m *Message) Reset() { *m = Message{} } @@ -165,6 +219,7 @@ func (*ConfChange) ProtoMessage() {} func init() { proto.RegisterEnum("raftpb.EntryType", EntryType_name, EntryType_value) + proto.RegisterEnum("raftpb.MessageType", MessageType_name, MessageType_value) proto.RegisterEnum("raftpb.ConfChangeType", ConfChangeType_name, ConfChangeType_value) } func (m *Entry) Unmarshal(data []byte) error { @@ -433,7 +488,7 @@ func (m *Message) Unmarshal(data []byte) error { } b := data[index] index++ - m.Type |= (uint64(b) & 0x7F) << shift + m.Type |= (MessageType(b) & 0x7F) << shift if b < 0x80 { break } diff --git a/raft/raftpb/raft.proto b/raft/raftpb/raft.proto index c6db9fa79..52809fa79 100644 --- a/raft/raftpb/raft.proto +++ b/raft/raftpb/raft.proto @@ -28,17 +28,29 @@ message Snapshot { repeated uint64 removed_nodes = 5 [(gogoproto.nullable) = false]; } +enum MessageType { + MsgHup = 0; + MsgBeat = 1; + MsgProp = 2; + MsgApp = 3; + MsgAppResp = 4; + MsgVote = 5; + MsgVoteResp = 6; + MsgSnap = 7; + MsgDenied = 8; +} + message Message { - required uint64 type = 1 [(gogoproto.nullable) = false]; - required uint64 to = 2 [(gogoproto.nullable) = false]; - required uint64 from = 3 [(gogoproto.nullable) = false]; - required uint64 term = 4 [(gogoproto.nullable) = false]; - required uint64 logTerm = 5 [(gogoproto.nullable) = false]; - required uint64 index = 6 [(gogoproto.nullable) = false]; - repeated Entry entries = 7 [(gogoproto.nullable) = false]; - required uint64 commit = 8 [(gogoproto.nullable) = false]; - required Snapshot snapshot = 9 [(gogoproto.nullable) = false]; - required bool reject = 10 [(gogoproto.nullable) = false]; + required MessageType type = 1 [(gogoproto.nullable) = false]; + required uint64 to = 2 [(gogoproto.nullable) = false]; + required uint64 from = 3 [(gogoproto.nullable) = false]; + required uint64 term = 4 [(gogoproto.nullable) = false]; + required uint64 logTerm = 5 [(gogoproto.nullable) = false]; + required uint64 index = 6 [(gogoproto.nullable) = false]; + repeated Entry entries = 7 [(gogoproto.nullable) = false]; + required uint64 commit = 8 [(gogoproto.nullable) = false]; + required Snapshot snapshot = 9 [(gogoproto.nullable) = false]; + required bool reject = 10 [(gogoproto.nullable) = false]; } message HardState {