diff --git a/etcdserver2/etcdhttp/http.go b/etcdserver2/etcdhttp/http.go index 4e6ea318c..827ecce07 100644 --- a/etcdserver2/etcdhttp/http.go +++ b/etcdserver2/etcdhttp/http.go @@ -13,7 +13,7 @@ import ( "code.google.com/p/go.net/context" "github.com/coreos/etcd/elog" etcdserver "github.com/coreos/etcd/etcdserver2" - "github.com/coreos/etcd/raft" + "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/store" ) @@ -66,7 +66,7 @@ func (h Handler) serveRaft(ctx context.Context, w http.ResponseWriter, r *http.R if err != nil { elog.TODO() } - var m raft.Message + var m raftpb.Message if err := m.Unmarshal(b); err != nil { elog.TODO() } diff --git a/etcdserver2/server.go b/etcdserver2/server.go index 953baf395..60c55a14d 100644 --- a/etcdserver2/server.go +++ b/etcdserver2/server.go @@ -7,13 +7,14 @@ import ( "code.google.com/p/go.net/context" "github.com/coreos/etcd/raft" + "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/store" "github.com/coreos/etcd/wait" ) var ErrUnknownMethod = errors.New("etcdserver: unknown method") -type SendFunc func(m []raft.Message) +type SendFunc func(m []raftpb.Message) type Response struct { // The last seen term raft was at when this request was built. @@ -44,7 +45,7 @@ type Server struct { // Save specifies the save function for saving ents to stable storage. // Save MUST block until st and ents are on stable storage. If Send is // nil, Server will panic. - Save func(st raft.State, ents []raft.Entry) + Save func(st raftpb.State, ents []raftpb.Entry) } func (s *Server) init() { s.w = wait.New() } @@ -116,7 +117,7 @@ func (s *Server) Do(ctx context.Context, r Request) (Response, error) { } // apply interprets r as a call to store.X and returns an Response interpreted from store.Event -func (s *Server) apply(ctx context.Context, e raft.Entry) (*store.Event, error) { +func (s *Server) apply(ctx context.Context, e raftpb.Entry) (*store.Event, error) { var r Request if err := r.Unmarshal(e.Data); err != nil { return nil, err diff --git a/raft/example_test.go b/raft/example_test.go index c8aa75acb..a8c7331d1 100644 --- a/raft/example_test.go +++ b/raft/example_test.go @@ -1,11 +1,15 @@ package raft -import "code.google.com/p/go.net/context" +import ( + "code.google.com/p/go.net/context" -func applyToStore(ents []Entry) {} -func sendMessages(msgs []Message) {} -func saveStateToDisk(st State) {} -func saveToDisk(ents []Entry) {} + pb "github.com/coreos/etcd/raft/raftpb" +) + +func applyToStore(ents []pb.Entry) {} +func sendMessages(msgs []pb.Message) {} +func saveStateToDisk(st pb.State) {} +func saveToDisk(ents []pb.Entry) {} func Example_Node() { n := Start(context.Background(), 0, nil) @@ -13,11 +17,11 @@ func Example_Node() { // stuff to n happens in other goroutines // the last known state - var prev State + var prev pb.State for { // ReadState blocks until there is new state ready. rd := <-n.Ready() - if !prev.Equal(rd.State) { + if !isStateEqual(prev, rd.State) { saveStateToDisk(rd.State) prev = rd.State } diff --git a/raft/log.go b/raft/log.go index 2676b236b..bd61e3b08 100644 --- a/raft/log.go +++ b/raft/log.go @@ -1,6 +1,10 @@ package raft -import "fmt" +import ( + "fmt" + + pb "github.com/coreos/etcd/raft/raftpb" +) const ( Normal int64 = iota @@ -14,18 +18,18 @@ const ( defaultCompactThreshold = 10000 ) -func (e *Entry) isConfig() bool { +func isConfig(e pb.Entry) bool { return e.Type == AddNode || e.Type == RemoveNode } type raftLog struct { - ents []Entry + ents []pb.Entry unstable int64 committed int64 applied int64 offset int64 - snapshot Snapshot - unstableSnapshot Snapshot + snapshot pb.Snapshot + unstableSnapshot pb.Snapshot // want a compact after the number of entries exceeds the threshold // TODO(xiangli) size might be a better criteria @@ -34,7 +38,7 @@ type raftLog struct { func newLog() *raftLog { return &raftLog{ - ents: make([]Entry, 1), + ents: make([]pb.Entry, 1), unstable: 1, committed: 0, applied: 0, @@ -50,7 +54,7 @@ func (l *raftLog) String() string { return fmt.Sprintf("offset=%d committed=%d applied=%d len(ents)=%d", l.offset, l.committed, l.applied, len(l.ents)) } -func (l *raftLog) maybeAppend(index, logTerm, committed int64, ents ...Entry) bool { +func (l *raftLog) maybeAppend(index, logTerm, committed int64, ents ...pb.Entry) bool { if l.matchTerm(index, logTerm) { from := index + 1 ci := l.findConflict(from, ents) @@ -69,13 +73,13 @@ func (l *raftLog) maybeAppend(index, logTerm, committed int64, ents ...Entry) bo return false } -func (l *raftLog) append(after int64, ents ...Entry) int64 { +func (l *raftLog) append(after int64, ents ...pb.Entry) int64 { l.ents = append(l.slice(l.offset, after+1), ents...) l.unstable = min(l.unstable, after+1) return l.lastIndex() } -func (l *raftLog) findConflict(from int64, ents []Entry) int64 { +func (l *raftLog) findConflict(from int64, ents []pb.Entry) int64 { for i, ne := range ents { if oe := l.at(from + int64(i)); oe == nil || oe.Term != ne.Term { return from + int64(i) @@ -84,12 +88,12 @@ func (l *raftLog) findConflict(from int64, ents []Entry) int64 { return -1 } -func (l *raftLog) unstableEnts() []Entry { +func (l *raftLog) unstableEnts() []pb.Entry { ents := l.entries(l.unstable) if ents == nil { return nil } - cpy := make([]Entry, len(ents)) + cpy := make([]pb.Entry, len(ents)) copy(cpy, ents) return cpy } @@ -100,13 +104,13 @@ func (l *raftLog) resetUnstable() { // nextEnts returns all the available entries for execution. // all the returned entries will be marked as applied. -func (l *raftLog) nextEnts() (ents []Entry) { +func (l *raftLog) nextEnts() (ents []pb.Entry) { if l.committed > l.applied { ents := l.slice(l.applied+1, l.committed+1) if ents == nil { return nil } - cpy := make([]Entry, len(ents)) + cpy := make([]pb.Entry, len(ents)) copy(cpy, ents) return cpy } @@ -130,7 +134,7 @@ func (l *raftLog) term(i int64) int64 { return -1 } -func (l *raftLog) entries(i int64) []Entry { +func (l *raftLog) entries(i int64) []pb.Entry { // never send out the first entry // first entry is only used for matching // prevLogTerm @@ -176,15 +180,15 @@ func (l *raftLog) compact(i int64) int64 { } func (l *raftLog) snap(d []byte, index, term int64, nodes []int64) { - l.snapshot = Snapshot{d, nodes, index, term, nil} + l.snapshot = pb.Snapshot{d, nodes, index, term, nil} } func (l *raftLog) shouldCompact() bool { return (l.applied - l.offset) > l.compactThreshold } -func (l *raftLog) restore(s Snapshot) { - l.ents = []Entry{{Term: s.Term}} +func (l *raftLog) restore(s pb.Snapshot) { + l.ents = []pb.Entry{{Term: s.Term}} l.unstable = s.Index + 1 l.committed = s.Index l.applied = s.Index @@ -192,7 +196,7 @@ func (l *raftLog) restore(s Snapshot) { l.snapshot = s } -func (l *raftLog) at(i int64) *Entry { +func (l *raftLog) at(i int64) *pb.Entry { if l.isOutOfBounds(i) { return nil } @@ -200,7 +204,7 @@ func (l *raftLog) at(i int64) *Entry { } // slice get a slice of log entries from lo through hi-1, inclusive. -func (l *raftLog) slice(lo int64, hi int64) []Entry { +func (l *raftLog) slice(lo int64, hi int64) []pb.Entry { if lo >= hi { return nil } diff --git a/raft/log_test.go b/raft/log_test.go index 5af620b4f..6b3217a5f 100644 --- a/raft/log_test.go +++ b/raft/log_test.go @@ -3,6 +3,8 @@ package raft import ( "reflect" "testing" + + pb "github.com/coreos/etcd/raft/raftpb" ) // TestAppend ensures: @@ -11,43 +13,43 @@ import ( // follow it // 2.Append any new entries not already in the log func TestAppend(t *testing.T) { - previousEnts := []Entry{{Term: 1}, {Term: 2}} + previousEnts := []pb.Entry{{Term: 1}, {Term: 2}} previousUnstable := int64(3) tests := []struct { after int64 - ents []Entry + ents []pb.Entry windex int64 - wents []Entry + wents []pb.Entry wunstable int64 }{ { 2, - []Entry{}, + []pb.Entry{}, 2, - []Entry{{Term: 1}, {Term: 2}}, + []pb.Entry{{Term: 1}, {Term: 2}}, 3, }, { 2, - []Entry{{Term: 2}}, + []pb.Entry{{Term: 2}}, 3, - []Entry{{Term: 1}, {Term: 2}, {Term: 2}}, + []pb.Entry{{Term: 1}, {Term: 2}, {Term: 2}}, 3, }, // conflicts with index 1 { 0, - []Entry{{Term: 2}}, + []pb.Entry{{Term: 2}}, 1, - []Entry{{Term: 2}}, + []pb.Entry{{Term: 2}}, 1, }, // conflicts with index 2 { 1, - []Entry{{Term: 3}, {Term: 3}}, + []pb.Entry{{Term: 3}, {Term: 3}}, 3, - []Entry{{Term: 1}, {Term: 3}, {Term: 3}}, + []pb.Entry{{Term: 1}, {Term: 3}, {Term: 3}}, 2, }, } @@ -77,7 +79,7 @@ func TestCompactionSideEffects(t *testing.T) { raftLog := newLog() for i = 0; i < lastIndex; i++ { - raftLog.append(int64(i), Entry{Term: int64(i + 1), Index: int64(i + 1)}) + raftLog.append(int64(i), pb.Entry{Term: int64(i + 1), Index: int64(i + 1)}) } raftLog.compact(500) @@ -107,7 +109,7 @@ func TestCompactionSideEffects(t *testing.T) { } prev := raftLog.lastIndex() - raftLog.append(raftLog.lastIndex(), Entry{Term: raftLog.lastIndex() + 1}) + raftLog.append(raftLog.lastIndex(), pb.Entry{Term: raftLog.lastIndex() + 1}) if raftLog.lastIndex() != prev+1 { t.Errorf("lastIndex = %d, want = %d", raftLog.lastIndex(), prev+1) } @@ -119,10 +121,10 @@ func TestCompactionSideEffects(t *testing.T) { } func TestUnstableEnts(t *testing.T) { - previousEnts := []Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}} + previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}} tests := []struct { unstable int64 - wents []Entry + wents []pb.Entry wunstable int64 }{ {3, nil, 3}, @@ -171,7 +173,7 @@ func TestCompaction(t *testing.T) { raftLog := newLog() for i := 0; i < tt.app; i++ { - raftLog.append(int64(i), Entry{}) + raftLog.append(int64(i), pb.Entry{}) } for j := 0; j < len(tt.compact); j++ { @@ -188,12 +190,12 @@ func TestLogRestore(t *testing.T) { var i int64 raftLog := newLog() for i = 0; i < 100; i++ { - raftLog.append(i, Entry{Term: i + 1}) + raftLog.append(i, pb.Entry{Term: i + 1}) } index := int64(1000) term := int64(1000) - raftLog.restore(Snapshot{Index: index, Term: term}) + raftLog.restore(pb.Snapshot{Index: index, Term: term}) // only has the guard entry if len(raftLog.ents) != 1 { @@ -219,7 +221,7 @@ func TestLogRestore(t *testing.T) { func TestIsOutOfBounds(t *testing.T) { offset := int64(100) num := int64(100) - l := &raftLog{offset: offset, ents: make([]Entry, num)} + l := &raftLog{offset: offset, ents: make([]pb.Entry, num)} tests := []struct { index int64 @@ -247,17 +249,17 @@ func TestAt(t *testing.T) { l := &raftLog{offset: offset} for i = 0; i < num; i++ { - l.ents = append(l.ents, Entry{Term: i}) + l.ents = append(l.ents, pb.Entry{Term: i}) } tests := []struct { index int64 - w *Entry + w *pb.Entry }{ {offset - 1, nil}, - {offset, &Entry{Term: 0}}, - {offset + num/2, &Entry{Term: num / 2}}, - {offset + num - 1, &Entry{Term: num - 1}}, + {offset, &pb.Entry{Term: 0}}, + {offset + num/2, &pb.Entry{Term: num / 2}}, + {offset + num - 1, &pb.Entry{Term: num - 1}}, {offset + num, nil}, } @@ -276,18 +278,18 @@ func TestSlice(t *testing.T) { l := &raftLog{offset: offset} for i = 0; i < num; i++ { - l.ents = append(l.ents, Entry{Term: i}) + l.ents = append(l.ents, pb.Entry{Term: i}) } tests := []struct { from int64 to int64 - w []Entry + w []pb.Entry }{ {offset - 1, offset + 1, nil}, - {offset, offset + 1, []Entry{{Term: 0}}}, - {offset + num/2, offset + num/2 + 1, []Entry{{Term: num / 2}}}, - {offset + num - 1, offset + num, []Entry{{Term: num - 1}}}, + {offset, offset + 1, []pb.Entry{{Term: 0}}}, + {offset + num/2, offset + num/2 + 1, []pb.Entry{{Term: num / 2}}}, + {offset + num - 1, offset + num, []pb.Entry{{Term: num - 1}}}, {offset + num, offset + num + 1, nil}, {offset + num/2, offset + num/2, nil}, diff --git a/raft/node.go b/raft/node.go index 7e8ac8902..42fa40cc5 100644 --- a/raft/node.go +++ b/raft/node.go @@ -3,38 +3,40 @@ package raft import ( "code.google.com/p/go.net/context" + + pb "github.com/coreos/etcd/raft/raftpb" ) type Ready struct { // The current state of a Node - State + pb.State // Entries specifies entries to be saved to stable storage BEFORE // Messages are sent. - Entries []Entry + Entries []pb.Entry // CommittedEntries specifies entries to be committed to a // store/state-machine. These have previously been committed to stable // store. - CommittedEntries []Entry + CommittedEntries []pb.Entry // Messages specifies outbound messages to be sent AFTER Entries are // committed to stable storage. - Messages []Message + Messages []pb.Message } -func (a State) Equal(b State) bool { +func isStateEqual(a, b pb.State) bool { return a.Term == b.Term && a.Vote == b.Vote && a.LastIndex == b.LastIndex } func (rd Ready) containsUpdates(prev Ready) bool { - return !prev.State.Equal(rd.State) || len(rd.Entries) > 0 || len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0 + return !isStateEqual(prev.State, rd.State) || len(rd.Entries) > 0 || len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0 } type Node struct { ctx context.Context - propc chan Message - recvc chan Message + propc chan pb.Message + recvc chan pb.Message readyc chan Ready tickc chan struct{} } @@ -42,8 +44,8 @@ type Node struct { func Start(ctx context.Context, id int64, peers []int64) Node { n := Node{ ctx: ctx, - propc: make(chan Message), - recvc: make(chan Message), + propc: make(chan pb.Message), + recvc: make(chan pb.Message), readyc: make(chan Ready), tickc: make(chan struct{}), } @@ -109,12 +111,12 @@ func (n *Node) Tick() error { // Propose proposes data be appended to the log. func (n *Node) Propose(ctx context.Context, id int64, data []byte) error { - return n.Step(ctx, Message{Type: msgProp, Entries: []Entry{{Id: id, Data: data}}}) + return n.Step(ctx, pb.Message{Type: msgProp, Entries: []pb.Entry{{Id: id, Data: data}}}) } // Step advances the state machine using msgs. The ctx.Err() will be returned, // if any. -func (n *Node) Step(ctx context.Context, m Message) error { +func (n *Node) Step(ctx context.Context, m pb.Message) error { ch := n.recvc if m.Type == msgProp { ch = n.propc @@ -135,7 +137,7 @@ func (n *Node) Ready() <-chan Ready { return n.readyc } -type byMsgType []Message +type byMsgType []pb.Message func (msgs byMsgType) Len() int { return len(msgs) } func (msgs byMsgType) Less(i, j int) bool { return msgs[i].Type == msgProp } diff --git a/raft/raft.go b/raft/raft.go index 0ef143708..f677adaa5 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -4,6 +4,8 @@ import ( "errors" "fmt" "sort" + + pb "github.com/coreos/etcd/raft/raftpb" ) const none = -1 @@ -91,7 +93,7 @@ func (p int64Slice) Less(i, j int) bool { return p[i] < p[j] } func (p int64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } type raft struct { - State + pb.State id int64 @@ -107,7 +109,7 @@ type raft struct { votes map[int64]bool - msgs []Message + msgs []pb.Message // the leader id lead int64 @@ -161,7 +163,7 @@ func (r *raft) poll(id int64, v bool) (granted int) { } // send persists state to stable storage and then sends to its mailbox. -func (r *raft) send(m Message) { +func (r *raft) send(m pb.Message) { m.From = r.id m.Term = r.Term r.msgs = append(r.msgs, m) @@ -170,7 +172,7 @@ func (r *raft) send(m Message) { // sendAppend sends RRPC, with entries to the given peer. func (r *raft) sendAppend(to int64) { pr := r.prs[to] - m := Message{} + m := pb.Message{} m.To = to m.Index = pr.next - 1 if r.needSnapshot(m.Index) { @@ -189,7 +191,7 @@ func (r *raft) sendAppend(to int64) { func (r *raft) sendHeartbeat(to int64) { pr := r.prs[to] index := max(pr.next-1, r.raftLog.lastIndex()) - m := Message{ + m := pb.Message{ To: to, Type: msgApp, Index: index, @@ -248,7 +250,7 @@ func (r *raft) q() int { return len(r.prs)/2 + 1 } -func (r *raft) appendEntry(e Entry) { +func (r *raft) appendEntry(e pb.Entry) { e.Term = r.Term e.Index = r.raftLog.lastIndex() + 1 r.LastIndex = r.raftLog.append(r.raftLog.lastIndex(), e) @@ -283,22 +285,22 @@ func (r *raft) becomeLeader() { r.state = stateLeader for _, e := range r.raftLog.entries(r.raftLog.committed + 1) { - if e.isConfig() { + if isConfig(e) { r.configuring = true } } - r.appendEntry(Entry{Type: Normal, Data: nil}) + r.appendEntry(pb.Entry{Type: Normal, Data: nil}) } -func (r *raft) ReadMessages() []Message { +func (r *raft) ReadMessages() []pb.Message { msgs := r.msgs - r.msgs = make([]Message, 0) + r.msgs = make([]pb.Message, 0) return msgs } -func (r *raft) Step(m Message) error { +func (r *raft) Step(m pb.Message) error { // TODO(bmizerany): this likely allocs - prevent that. defer func() { r.Commit = r.raftLog.committed }() @@ -312,7 +314,7 @@ func (r *raft) Step(m Message) error { continue } lasti := r.raftLog.lastIndex() - r.send(Message{To: i, Type: msgVote, Index: lasti, LogTerm: r.raftLog.term(lasti)}) + r.send(pb.Message{To: i, Type: msgVote, Index: lasti, LogTerm: r.raftLog.term(lasti)}) } } @@ -333,20 +335,20 @@ func (r *raft) Step(m Message) error { return nil } -func (r *raft) handleAppendEntries(m Message) { +func (r *raft) handleAppendEntries(m pb.Message) { if r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...) { r.LastIndex = r.raftLog.lastIndex() - r.send(Message{To: m.From, Type: msgAppResp, Index: r.raftLog.lastIndex()}) + r.send(pb.Message{To: m.From, Type: msgAppResp, Index: r.raftLog.lastIndex()}) } else { - r.send(Message{To: m.From, Type: msgAppResp, Index: -1}) + r.send(pb.Message{To: m.From, Type: msgAppResp, Index: -1}) } } -func (r *raft) handleSnapshot(m Message) { +func (r *raft) handleSnapshot(m pb.Message) { if r.restore(m.Snapshot) { - r.send(Message{To: m.From, Type: msgAppResp, Index: r.raftLog.lastIndex()}) + r.send(pb.Message{To: m.From, Type: msgAppResp, Index: r.raftLog.lastIndex()}) } else { - r.send(Message{To: m.From, Type: msgAppResp, Index: r.raftLog.committed}) + r.send(pb.Message{To: m.From, Type: msgAppResp, Index: r.raftLog.committed}) } } @@ -363,9 +365,9 @@ func (r *raft) removeNode(id int64) { r.configuring = false } -type stepFunc func(r *raft, m Message) +type stepFunc func(r *raft, m pb.Message) -func stepLeader(r *raft, m Message) { +func stepLeader(r *raft, m pb.Message) { switch m.Type { case msgBeat: r.bcastHeartbeat() @@ -374,7 +376,7 @@ func stepLeader(r *raft, m Message) { panic("unexpected length(entries) of a msgProp") } e := m.Entries[0] - if e.isConfig() { + if isConfig(e) { if r.configuring { panic("pending conf") } @@ -393,11 +395,11 @@ func stepLeader(r *raft, m Message) { } } case msgVote: - r.send(Message{To: m.From, Type: msgVoteResp, Index: -1}) + r.send(pb.Message{To: m.From, Type: msgVoteResp, Index: -1}) } } -func stepCandidate(r *raft, m Message) { +func stepCandidate(r *raft, m pb.Message) { switch m.Type { case msgProp: panic("no leader") @@ -408,7 +410,7 @@ func stepCandidate(r *raft, m Message) { r.becomeFollower(m.Term, m.From) r.handleSnapshot(m) case msgVote: - r.send(Message{To: m.From, Type: msgVoteResp, Index: -1}) + r.send(pb.Message{To: m.From, Type: msgVoteResp, Index: -1}) case msgVoteResp: gr := r.poll(m.From, m.Index >= 0) switch r.q() { @@ -421,7 +423,7 @@ func stepCandidate(r *raft, m Message) { } } -func stepFollower(r *raft, m Message) { +func stepFollower(r *raft, m pb.Message) { switch m.Type { case msgProp: if r.lead == none { @@ -437,9 +439,9 @@ func stepFollower(r *raft, m Message) { case msgVote: if (r.Vote == none || r.Vote == m.From) && r.raftLog.isUpToDate(m.Index, m.LogTerm) { r.Vote = m.From - r.send(Message{To: m.From, Type: msgVoteResp, Index: r.raftLog.lastIndex()}) + r.send(pb.Message{To: m.From, Type: msgVoteResp, Index: r.raftLog.lastIndex()}) } else { - r.send(Message{To: m.From, Type: msgVoteResp, Index: -1}) + r.send(pb.Message{To: m.From, Type: msgVoteResp, Index: -1}) } } } @@ -451,7 +453,7 @@ func (r *raft) compact(d []byte) { // restore recovers the statemachine from a snapshot. It restores the log and the // configuration of statemachine. -func (r *raft) restore(s Snapshot) bool { +func (r *raft) restore(s pb.Snapshot) bool { if s.Index <= r.raftLog.committed { return false } @@ -496,7 +498,7 @@ func (r *raft) delProgress(id int64) { delete(r.prs, id) } -func (r *raft) loadEnts(ents []Entry) { +func (r *raft) loadEnts(ents []pb.Entry) { if !r.raftLog.isEmpty() { panic("cannot load entries when log is not empty") } @@ -504,7 +506,7 @@ func (r *raft) loadEnts(ents []Entry) { r.raftLog.unstable = r.raftLog.lastIndex() + 1 } -func (r *raft) loadState(state State) { +func (r *raft) loadState(state pb.State) { r.raftLog.committed = state.Commit r.Term = state.Term r.Vote = state.Vote diff --git a/raft/raft_test.go b/raft/raft_test.go index c2682e49b..7be7250c4 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -6,18 +6,20 @@ import ( "reflect" "sort" "testing" + + pb "github.com/coreos/etcd/raft/raftpb" ) // nextEnts returns the appliable entries and updates the applied index -func (r *raft) nextEnts() (ents []Entry) { +func (r *raft) nextEnts() (ents []pb.Entry) { ents = r.raftLog.nextEnts() r.raftLog.resetNextEnts() return ents } type Interface interface { - Step(m Message) error - ReadMessages() []Message + Step(m pb.Message) error + ReadMessages() []pb.Message } func TestLeaderElection(t *testing.T) { @@ -39,7 +41,7 @@ func TestLeaderElection(t *testing.T) { } for i, tt := range tests { - tt.send(Message{From: 0, To: 0, Type: msgHup}) + tt.send(pb.Message{From: 0, To: 0, Type: msgHup}) sm := tt.network.peers[0].(*raft) if sm.state != tt.state { t.Errorf("#%d: state = %s, want %s", i, sm.state, tt.state) @@ -53,30 +55,30 @@ func TestLeaderElection(t *testing.T) { func TestLogReplication(t *testing.T) { tests := []struct { *network - msgs []Message + msgs []pb.Message wcommitted int64 }{ { newNetwork(nil, nil, nil), - []Message{ - {From: 0, To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("somedata")}}}, + []pb.Message{ + {From: 0, To: 0, Type: msgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}, }, 2, }, { newNetwork(nil, nil, nil), - []Message{ + []pb.Message{ - {From: 0, To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("somedata")}}}, + {From: 0, To: 0, Type: msgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}, {From: 0, To: 1, Type: msgHup}, - {From: 0, To: 1, Type: msgProp, Entries: []Entry{{Data: []byte("somedata")}}}, + {From: 0, To: 1, Type: msgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}, }, 4, }, } for i, tt := range tests { - tt.send(Message{From: 0, To: 0, Type: msgHup}) + tt.send(pb.Message{From: 0, To: 0, Type: msgHup}) for _, m := range tt.msgs { tt.send(m) @@ -89,13 +91,13 @@ func TestLogReplication(t *testing.T) { t.Errorf("#%d.%d: committed = %d, want %d", i, j, sm.raftLog.committed, tt.wcommitted) } - ents := make([]Entry, 0) + ents := make([]pb.Entry, 0) for _, e := range sm.nextEnts() { if e.Data != nil { ents = append(ents, e) } } - props := make([]Message, 0) + props := make([]pb.Message, 0) for _, m := range tt.msgs { if m.Type == msgProp { props = append(props, m) @@ -112,9 +114,9 @@ func TestLogReplication(t *testing.T) { func TestSingleNodeCommit(t *testing.T) { tt := newNetwork(nil) - tt.send(Message{From: 0, To: 0, Type: msgHup}) - tt.send(Message{From: 0, To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}}) - tt.send(Message{From: 0, To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}}) + tt.send(pb.Message{From: 0, To: 0, Type: msgHup}) + tt.send(pb.Message{From: 0, To: 0, Type: msgProp, Entries: []pb.Entry{{Data: []byte("some data")}}}) + tt.send(pb.Message{From: 0, To: 0, Type: msgProp, Entries: []pb.Entry{{Data: []byte("some data")}}}) sm := tt.peers[0].(*raft) if sm.raftLog.committed != 3 { @@ -127,15 +129,15 @@ func TestSingleNodeCommit(t *testing.T) { // filtered. func TestCannotCommitWithoutNewTermEntry(t *testing.T) { tt := newNetwork(nil, nil, nil, nil, nil) - tt.send(Message{From: 0, To: 0, Type: msgHup}) + tt.send(pb.Message{From: 0, To: 0, Type: msgHup}) // 0 cannot reach 2,3,4 tt.cut(0, 2) tt.cut(0, 3) tt.cut(0, 4) - tt.send(Message{From: 0, To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}}) - tt.send(Message{From: 0, To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}}) + tt.send(pb.Message{From: 0, To: 0, Type: msgProp, Entries: []pb.Entry{{Data: []byte("some data")}}}) + tt.send(pb.Message{From: 0, To: 0, Type: msgProp, Entries: []pb.Entry{{Data: []byte("some data")}}}) sm := tt.peers[0].(*raft) if sm.raftLog.committed != 1 { @@ -148,7 +150,7 @@ func TestCannotCommitWithoutNewTermEntry(t *testing.T) { tt.ignore(msgApp) // elect 1 as the new leader with term 2 - tt.send(Message{From: 1, To: 1, Type: msgHup}) + tt.send(pb.Message{From: 1, To: 1, Type: msgHup}) // no log entries from previous term should be committed sm = tt.peers[1].(*raft) @@ -161,14 +163,14 @@ func TestCannotCommitWithoutNewTermEntry(t *testing.T) { // send out a heartbeat // after append a ChangeTerm entry from the current term, all entries // should be committed - tt.send(Message{From: 1, To: 1, Type: msgBeat}) + tt.send(pb.Message{From: 1, To: 1, Type: msgBeat}) if sm.raftLog.committed != 4 { t.Errorf("committed = %d, want %d", sm.raftLog.committed, 4) } // still be able to append a entry - tt.send(Message{From: 1, To: 1, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}}) + tt.send(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Data: []byte("some data")}}}) if sm.raftLog.committed != 5 { t.Errorf("committed = %d, want %d", sm.raftLog.committed, 5) @@ -179,15 +181,15 @@ func TestCannotCommitWithoutNewTermEntry(t *testing.T) { // when leader changes, no new proposal comes in. func TestCommitWithoutNewTermEntry(t *testing.T) { tt := newNetwork(nil, nil, nil, nil, nil) - tt.send(Message{From: 0, To: 0, Type: msgHup}) + tt.send(pb.Message{From: 0, To: 0, Type: msgHup}) // 0 cannot reach 2,3,4 tt.cut(0, 2) tt.cut(0, 3) tt.cut(0, 4) - tt.send(Message{From: 0, To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}}) - tt.send(Message{From: 0, To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}}) + tt.send(pb.Message{From: 0, To: 0, Type: msgProp, Entries: []pb.Entry{{Data: []byte("some data")}}}) + tt.send(pb.Message{From: 0, To: 0, Type: msgProp, Entries: []pb.Entry{{Data: []byte("some data")}}}) sm := tt.peers[0].(*raft) if sm.raftLog.committed != 1 { @@ -200,7 +202,7 @@ func TestCommitWithoutNewTermEntry(t *testing.T) { // elect 1 as the new leader with term 2 // after append a ChangeTerm entry from the current term, all entries // should be committed - tt.send(Message{From: 1, To: 1, Type: msgHup}) + tt.send(pb.Message{From: 1, To: 1, Type: msgHup}) if sm.raftLog.committed != 4 { t.Errorf("committed = %d, want %d", sm.raftLog.committed, 4) @@ -215,13 +217,13 @@ func TestDuelingCandidates(t *testing.T) { nt := newNetwork(a, b, c) nt.cut(0, 2) - nt.send(Message{From: 0, To: 0, Type: msgHup}) - nt.send(Message{From: 2, To: 2, Type: msgHup}) + nt.send(pb.Message{From: 0, To: 0, Type: msgHup}) + nt.send(pb.Message{From: 2, To: 2, Type: msgHup}) nt.recover() - nt.send(Message{From: 2, To: 2, Type: msgHup}) + nt.send(pb.Message{From: 2, To: 2, Type: msgHup}) - wlog := &raftLog{ents: []Entry{{}, Entry{Type: Normal, Data: nil, Term: 1, Index: 1}}, committed: 1} + wlog := &raftLog{ents: []pb.Entry{{}, pb.Entry{Type: Normal, Data: nil, Term: 1, Index: 1}}, committed: 1} tests := []struct { sm *raft state stateType @@ -256,15 +258,15 @@ func TestCandidateConcede(t *testing.T) { tt := newNetwork(nil, nil, nil) tt.isolate(0) - tt.send(Message{From: 0, To: 0, Type: msgHup}) - tt.send(Message{From: 2, To: 2, Type: msgHup}) + tt.send(pb.Message{From: 0, To: 0, Type: msgHup}) + tt.send(pb.Message{From: 2, To: 2, Type: msgHup}) // heal the partition tt.recover() data := []byte("force follower") // send a proposal to 2 to flush out a msgApp to 0 - tt.send(Message{From: 2, To: 2, Type: msgProp, Entries: []Entry{{Data: data}}}) + tt.send(pb.Message{From: 2, To: 2, Type: msgProp, Entries: []pb.Entry{{Data: data}}}) a := tt.peers[0].(*raft) if g := a.state; g != stateFollower { @@ -273,7 +275,7 @@ func TestCandidateConcede(t *testing.T) { if g := a.Term; g != 1 { t.Errorf("term = %d, want %d", g, 1) } - wantLog := ltoa(&raftLog{ents: []Entry{{}, {Type: Normal, Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}}, committed: 2}) + wantLog := ltoa(&raftLog{ents: []pb.Entry{{}, {Type: Normal, Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}}, committed: 2}) for i, p := range tt.peers { if sm, ok := p.(*raft); ok { l := ltoa(sm.raftLog) @@ -288,7 +290,7 @@ func TestCandidateConcede(t *testing.T) { func TestSingleNodeCandidate(t *testing.T) { tt := newNetwork(nil) - tt.send(Message{From: 0, To: 0, Type: msgHup}) + tt.send(pb.Message{From: 0, To: 0, Type: msgHup}) sm := tt.peers[0].(*raft) if sm.state != stateLeader { @@ -299,14 +301,14 @@ func TestSingleNodeCandidate(t *testing.T) { func TestOldMessages(t *testing.T) { tt := newNetwork(nil, nil, nil) // make 0 leader @ term 3 - tt.send(Message{From: 0, To: 0, Type: msgHup}) - tt.send(Message{From: 1, To: 1, Type: msgHup}) - tt.send(Message{From: 0, To: 0, Type: msgHup}) + tt.send(pb.Message{From: 0, To: 0, Type: msgHup}) + tt.send(pb.Message{From: 1, To: 1, Type: msgHup}) + tt.send(pb.Message{From: 0, To: 0, Type: msgHup}) // pretend we're an old leader trying to make progress - tt.send(Message{From: 0, To: 0, Type: msgApp, Term: 1, Entries: []Entry{{Term: 1}}}) + tt.send(pb.Message{From: 0, To: 0, Type: msgApp, Term: 1, Entries: []pb.Entry{{Term: 1}}}) l := &raftLog{ - ents: []Entry{ + ents: []pb.Entry{ {}, {Type: Normal, Data: nil, Term: 1, Index: 1}, {Type: Normal, Data: nil, Term: 2, Index: 2}, {Type: Normal, Data: nil, Term: 3, Index: 3}, }, @@ -340,7 +342,7 @@ func TestProposal(t *testing.T) { } for i, tt := range tests { - send := func(m Message) { + send := func(m pb.Message) { defer func() { // only recover is we expect it to panic so // panics we don't expect go up. @@ -357,12 +359,12 @@ func TestProposal(t *testing.T) { data := []byte("somedata") // promote 0 the leader - send(Message{From: 0, To: 0, Type: msgHup}) - send(Message{From: 0, To: 0, Type: msgProp, Entries: []Entry{{Data: data}}}) + send(pb.Message{From: 0, To: 0, Type: msgHup}) + send(pb.Message{From: 0, To: 0, Type: msgProp, Entries: []pb.Entry{{Data: data}}}) wantLog := newLog() if tt.success { - wantLog = &raftLog{ents: []Entry{{}, {Type: Normal, Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}}, committed: 2} + wantLog = &raftLog{ents: []pb.Entry{{}, {Type: Normal, Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}}, committed: 2} } base := ltoa(wantLog) for i, p := range tt.peers { @@ -391,12 +393,12 @@ func TestProposalByProxy(t *testing.T) { for i, tt := range tests { // promote 0 the leader - tt.send(Message{From: 0, To: 0, Type: msgHup}) + tt.send(pb.Message{From: 0, To: 0, Type: msgHup}) // propose via follower - tt.send(Message{From: 1, To: 1, Type: msgProp, Entries: []Entry{{Data: []byte("somedata")}}}) + tt.send(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) - wantLog := &raftLog{ents: []Entry{{}, {Type: Normal, Data: nil, Term: 1, Index: 1}, {Term: 1, Data: data, Index: 2}}, committed: 2} + wantLog := &raftLog{ents: []pb.Entry{{}, {Type: Normal, Data: nil, Term: 1, Index: 1}, {Term: 1, Data: data, Index: 2}}, committed: 2} base := ltoa(wantLog) for i, p := range tt.peers { if sm, ok := p.(*raft); ok { @@ -418,29 +420,29 @@ func TestProposalByProxy(t *testing.T) { func TestCommit(t *testing.T) { tests := []struct { matches []int64 - logs []Entry + logs []pb.Entry smTerm int64 w int64 }{ // single - {[]int64{1}, []Entry{{}, {Term: 1}}, 1, 1}, - {[]int64{1}, []Entry{{}, {Term: 1}}, 2, 0}, - {[]int64{2}, []Entry{{}, {Term: 1}, {Term: 2}}, 2, 2}, - {[]int64{1}, []Entry{{}, {Term: 2}}, 2, 1}, + {[]int64{1}, []pb.Entry{{}, {Term: 1}}, 1, 1}, + {[]int64{1}, []pb.Entry{{}, {Term: 1}}, 2, 0}, + {[]int64{2}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 2, 2}, + {[]int64{1}, []pb.Entry{{}, {Term: 2}}, 2, 1}, // odd - {[]int64{2, 1, 1}, []Entry{{}, {Term: 1}, {Term: 2}}, 1, 1}, - {[]int64{2, 1, 1}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, 0}, - {[]int64{2, 1, 2}, []Entry{{}, {Term: 1}, {Term: 2}}, 2, 2}, - {[]int64{2, 1, 2}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, 0}, + {[]int64{2, 1, 1}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 1, 1}, + {[]int64{2, 1, 1}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0}, + {[]int64{2, 1, 2}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 2, 2}, + {[]int64{2, 1, 2}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0}, // even - {[]int64{2, 1, 1, 1}, []Entry{{}, {Term: 1}, {Term: 2}}, 1, 1}, - {[]int64{2, 1, 1, 1}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, 0}, - {[]int64{2, 1, 1, 2}, []Entry{{}, {Term: 1}, {Term: 2}}, 1, 1}, - {[]int64{2, 1, 1, 2}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, 0}, - {[]int64{2, 1, 2, 2}, []Entry{{}, {Term: 1}, {Term: 2}}, 2, 2}, - {[]int64{2, 1, 2, 2}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, 0}, + {[]int64{2, 1, 1, 1}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 1, 1}, + {[]int64{2, 1, 1, 1}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0}, + {[]int64{2, 1, 1, 2}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 1, 1}, + {[]int64{2, 1, 1, 2}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0}, + {[]int64{2, 1, 2, 2}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 2, 2}, + {[]int64{2, 1, 2, 2}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0}, } for i, tt := range tests { @@ -448,7 +450,7 @@ func TestCommit(t *testing.T) { for j := 0; j < len(tt.matches); j++ { prs[int64(j)] = &progress{tt.matches[j], tt.matches[j] + 1} } - sm := &raft{raftLog: &raftLog{ents: tt.logs}, prs: prs, State: State{Term: tt.smTerm}} + sm := &raft{raftLog: &raftLog{ents: tt.logs}, prs: prs, State: pb.State{Term: tt.smTerm}} sm.maybeCommit() if g := sm.raftLog.committed; g != tt.w { t.Errorf("#%d: committed = %d, want %d", i, g, tt.w) @@ -463,32 +465,32 @@ func TestCommit(t *testing.T) { // 3. If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry). func TestHandleMsgApp(t *testing.T) { tests := []struct { - m Message + m pb.Message wIndex int64 wCommit int64 wAccept bool }{ // Ensure 1 - {Message{Type: msgApp, Term: 2, LogTerm: 3, Index: 2, Commit: 3}, 2, 0, false}, // previous log mismatch - {Message{Type: msgApp, Term: 2, LogTerm: 3, Index: 3, Commit: 3}, 2, 0, false}, // previous log non-exist + {pb.Message{Type: msgApp, Term: 2, LogTerm: 3, Index: 2, Commit: 3}, 2, 0, false}, // previous log mismatch + {pb.Message{Type: msgApp, Term: 2, LogTerm: 3, Index: 3, Commit: 3}, 2, 0, false}, // previous log non-exist // Ensure 2 - {Message{Type: msgApp, Term: 2, LogTerm: 1, Index: 1, Commit: 1}, 2, 1, true}, - {Message{Type: msgApp, Term: 2, LogTerm: 0, Index: 0, Commit: 1, Entries: []Entry{{Term: 2}}}, 1, 1, true}, - {Message{Type: msgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 3, Entries: []Entry{{Term: 2}, {Term: 2}}}, 4, 3, true}, - {Message{Type: msgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 4, Entries: []Entry{{Term: 2}}}, 3, 3, true}, - {Message{Type: msgApp, Term: 2, LogTerm: 1, Index: 1, Commit: 4, Entries: []Entry{{Term: 2}}}, 2, 2, true}, + {pb.Message{Type: msgApp, Term: 2, LogTerm: 1, Index: 1, Commit: 1}, 2, 1, true}, + {pb.Message{Type: msgApp, Term: 2, LogTerm: 0, Index: 0, Commit: 1, Entries: []pb.Entry{{Term: 2}}}, 1, 1, true}, + {pb.Message{Type: msgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 3, Entries: []pb.Entry{{Term: 2}, {Term: 2}}}, 4, 3, true}, + {pb.Message{Type: msgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 4, Entries: []pb.Entry{{Term: 2}}}, 3, 3, true}, + {pb.Message{Type: msgApp, Term: 2, LogTerm: 1, Index: 1, Commit: 4, Entries: []pb.Entry{{Term: 2}}}, 2, 2, true}, // Ensure 3 - {Message{Type: msgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 2}, 2, 2, true}, - {Message{Type: msgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 4}, 2, 2, true}, // commit upto min(commit, last) + {pb.Message{Type: msgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 2}, 2, 2, true}, + {pb.Message{Type: msgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 4}, 2, 2, true}, // commit upto min(commit, last) } for i, tt := range tests { sm := &raft{ - state: stateFollower, - State: State{Term: 2}, - raftLog: &raftLog{committed: 0, ents: []Entry{{}, {Term: 1}, {Term: 2}}}, + state: stateFollower, + State: pb.State{Term: 2}, + raftLog: &raftLog{committed: 0, ents: []pb.Entry{{}, {Term: 1}, {Term: 2}}}, } sm.handleAppendEntries(tt.m) @@ -548,12 +550,12 @@ func TestRecvMsgVote(t *testing.T) { for i, tt := range tests { sm := &raft{ - state: tt.state, - State: State{Vote: tt.voteFor}, - raftLog: &raftLog{ents: []Entry{{}, {Term: 2}, {Term: 2}}}, + state: tt.state, + State: pb.State{Vote: tt.voteFor}, + raftLog: &raftLog{ents: []pb.Entry{{}, {Term: 2}, {Term: 2}}}, } - sm.Step(Message{Type: msgVote, From: 1, Index: tt.i, LogTerm: tt.term}) + sm.Step(pb.Message{Type: msgVote, From: 1, Index: tt.i, LogTerm: tt.term}) msgs := sm.ReadMessages() if g := len(msgs); g != 1 { @@ -624,7 +626,7 @@ func TestConf(t *testing.T) { sm.becomeCandidate() sm.becomeLeader() - sm.Step(Message{From: 0, To: 0, Type: msgProp, Entries: []Entry{{Type: AddNode}}}) + sm.Step(pb.Message{From: 0, To: 0, Type: msgProp, Entries: []pb.Entry{{Type: AddNode}}}) if sm.raftLog.lastIndex() != 2 { t.Errorf("lastindex = %d, want %d", sm.raftLog.lastIndex(), 1) } @@ -638,7 +640,7 @@ func TestConf(t *testing.T) { // deny the second configuration change request if there is a pending one paniced := false defer func() { recover(); paniced = true }() - sm.Step(Message{From: 0, To: 0, Type: msgProp, Entries: []Entry{{Type: AddNode}}}) + sm.Step(pb.Message{From: 0, To: 0, Type: msgProp, Entries: []pb.Entry{{Type: AddNode}}}) if !paniced { t.Errorf("expected panic") } @@ -661,7 +663,7 @@ func TestConfChangeLeader(t *testing.T) { for i, tt := range tests { sm := newRaft(0, []int64{0}) - sm.raftLog = &raftLog{ents: []Entry{{}, {Type: tt.et}}} + sm.raftLog = &raftLog{ents: []pb.Entry{{}, {Type: tt.et}}} sm.becomeCandidate() sm.becomeLeader() @@ -701,7 +703,7 @@ func TestAllServerStepdown(t *testing.T) { } for j, msgType := range tmsgTypes { - sm.Step(Message{From: 1, Type: msgType, Term: tterm, LogTerm: tterm}) + sm.Step(pb.Message{From: 1, Type: msgType, Term: tterm, LogTerm: tterm}) if sm.state != tt.wstate { t.Errorf("#%d.%d state = %v , want %v", i, j, sm.state, tt.wstate) @@ -738,11 +740,11 @@ func TestLeaderAppResp(t *testing.T) { // sm term is 1 after it becomes the leader. // thus the last log term must be 1 to be committed. sm := newRaft(0, []int64{0, 1, 2}) - sm.raftLog = &raftLog{ents: []Entry{{}, {Term: 0}, {Term: 1}}} + sm.raftLog = &raftLog{ents: []pb.Entry{{}, {Term: 0}, {Term: 1}}} sm.becomeCandidate() sm.becomeLeader() sm.ReadMessages() - sm.Step(Message{From: 1, Type: msgAppResp, Index: tt.index, Term: sm.Term}) + sm.Step(pb.Message{From: 1, Type: msgAppResp, Index: tt.index, Term: sm.Term}) msgs := sm.ReadMessages() if len(msgs) != tt.wmsgNum { @@ -773,10 +775,10 @@ func TestRecvMsgBeat(t *testing.T) { for i, tt := range tests { sm := newRaft(0, []int64{0, 1, 2}) - sm.raftLog = &raftLog{ents: []Entry{{}, {Term: 0}, {Term: 1}}} + sm.raftLog = &raftLog{ents: []pb.Entry{{}, {Term: 0}, {Term: 1}}} sm.Term = 1 sm.state = tt.state - sm.Step(Message{From: 0, To: 0, Type: msgBeat}) + sm.Step(pb.Message{From: 0, To: 0, Type: msgBeat}) msgs := sm.ReadMessages() if len(msgs) != tt.wMsg { @@ -791,7 +793,7 @@ func TestRecvMsgBeat(t *testing.T) { } func TestRestore(t *testing.T) { - s := Snapshot{ + s := pb.Snapshot{ Index: defaultCompactThreshold + 1, Term: defaultCompactThreshold + 1, Nodes: []int64{0, 1, 2}, @@ -825,7 +827,7 @@ func TestRestore(t *testing.T) { } func TestProvideSnap(t *testing.T) { - s := Snapshot{ + s := pb.Snapshot{ Index: defaultCompactThreshold + 1, Term: defaultCompactThreshold + 1, Nodes: []int64{0, 1}, @@ -838,7 +840,7 @@ func TestProvideSnap(t *testing.T) { sm.becomeCandidate() sm.becomeLeader() - sm.Step(Message{From: 0, To: 0, Type: msgBeat}) + sm.Step(pb.Message{From: 0, To: 0, Type: msgBeat}) msgs := sm.ReadMessages() if len(msgs) != 1 { t.Errorf("len(msgs) = %d, want 1", len(msgs)) @@ -852,7 +854,7 @@ func TestProvideSnap(t *testing.T) { // node 1 needs a snapshot sm.prs[1].next = sm.raftLog.offset - sm.Step(Message{From: 1, To: 0, Type: msgAppResp, Index: -1}) + sm.Step(pb.Message{From: 1, To: 0, Type: msgAppResp, Index: -1}) msgs = sm.ReadMessages() if len(msgs) != 1 { t.Errorf("len(msgs) = %d, want 1", len(msgs)) @@ -864,12 +866,12 @@ func TestProvideSnap(t *testing.T) { } func TestRestoreFromSnapMsg(t *testing.T) { - s := Snapshot{ + s := pb.Snapshot{ Index: defaultCompactThreshold + 1, Term: defaultCompactThreshold + 1, Nodes: []int64{0, 1}, } - m := Message{Type: msgSnap, From: 0, Term: 1, Snapshot: s} + m := pb.Message{Type: msgSnap, From: 0, Term: 1, Snapshot: s} sm := newRaft(1, []int64{0, 1}) sm.Step(m) @@ -881,18 +883,18 @@ func TestRestoreFromSnapMsg(t *testing.T) { func TestSlowNodeRestore(t *testing.T) { nt := newNetwork(nil, nil, nil) - nt.send(Message{From: 0, To: 0, Type: msgHup}) + nt.send(pb.Message{From: 0, To: 0, Type: msgHup}) nt.isolate(2) for j := 0; j < defaultCompactThreshold+1; j++ { - nt.send(Message{From: 0, To: 0, Type: msgProp, Entries: []Entry{{}}}) + nt.send(pb.Message{From: 0, To: 0, Type: msgProp, Entries: []pb.Entry{{}}}) } lead := nt.peers[0].(*raft) lead.nextEnts() lead.compact(nil) nt.recover() - nt.send(Message{From: 0, To: 0, Type: msgBeat}) + nt.send(pb.Message{From: 0, To: 0, Type: msgBeat}) follower := nt.peers[2].(*raft) if !reflect.DeepEqual(follower.raftLog.snapshot, lead.raftLog.snapshot) { @@ -900,16 +902,16 @@ func TestSlowNodeRestore(t *testing.T) { } committed := follower.raftLog.lastIndex() - nt.send(Message{From: 0, To: 0, Type: msgProp, Entries: []Entry{{}}}) + nt.send(pb.Message{From: 0, To: 0, Type: msgProp, Entries: []pb.Entry{{}}}) if follower.raftLog.committed != committed+1 { t.Errorf("follower.comitted = %d, want %d", follower.raftLog.committed, committed+1) } } func ents(terms ...int64) *raft { - ents := []Entry{{}} + ents := []pb.Entry{{}} for _, term := range terms { - ents = append(ents, Entry{Term: term}) + ents = append(ents, pb.Entry{Term: term}) } sm := &raft{raftLog: &raftLog{ents: ents}} @@ -961,7 +963,7 @@ func newNetwork(peers ...Interface) *network { } } -func (nw *network) send(msgs ...Message) { +func (nw *network) send(msgs ...pb.Message) { for len(msgs) > 0 { m := msgs[0] p := nw.peers[m.To] @@ -998,8 +1000,8 @@ func (nw *network) recover() { nw.ignorem = make(map[int64]bool) } -func (nw *network) filter(msgs []Message) []Message { - mm := make([]Message, 0) +func (nw *network) filter(msgs []pb.Message) []pb.Message { + mm := make([]pb.Message, 0) for _, m := range msgs { if nw.ignorem[m.Type] { continue @@ -1025,7 +1027,7 @@ type connem struct { type blackHole struct{} -func (blackHole) Step(Message) error { return nil } -func (blackHole) ReadMessages() []Message { return nil } +func (blackHole) Step(pb.Message) error { return nil } +func (blackHole) ReadMessages() []pb.Message { return nil } var nopStepper = &blackHole{} diff --git a/raft/genproto.sh b/raft/raftpb/genproto.sh similarity index 100% rename from raft/genproto.sh rename to raft/raftpb/genproto.sh diff --git a/raft/protos.pb.go b/raft/raftpb/raft.pb.go similarity index 69% rename from raft/protos.pb.go rename to raft/raftpb/raft.pb.go index bd3a7ec19..3390cb264 100644 --- a/raft/protos.pb.go +++ b/raft/raftpb/raft.pb.go @@ -1,26 +1,26 @@ // Code generated by protoc-gen-gogo. -// source: protos.proto +// source: raft.proto // DO NOT EDIT! /* - Package raft is a generated protocol buffer package. + Package raftis a generated protocol buffer package. It is generated from these files: - protos.proto - state.proto + raft.proto It has these top-level messages: Entry Snapshot Message + State */ -package raft +package raftpb import proto "code.google.com/p/gogoprotobuf/proto" import json "encoding/json" import math "math" -// discarding unused import gogoproto "code.google.com/p/gogoprotobuf/gogoproto/gogo.pb" +// discarding unused import gogoproto "code.google.com/p/gogoprotobuf/gogoproto/gogo. import io "io" import code_google_com_p_gogoprotobuf_proto "code.google.com/p/gogoprotobuf/proto" @@ -56,22 +56,34 @@ func (m *Snapshot) String() string { return proto.CompactTextString(m) } func (*Snapshot) ProtoMessage() {} type Message struct { - Type int64 `protobuf:"varint,1,req,name=type" json:"type"` - To int64 `protobuf:"varint,2,req,name=to" json:"to"` - From int64 `protobuf:"varint,3,req,name=from" json:"from"` - Term int64 `protobuf:"varint,4,req,name=term" json:"term"` - LogTerm int64 `protobuf:"varint,5,req,name=logTerm" json:"logTerm"` - Index int64 `protobuf:"varint,6,req,name=index" json:"index"` - Entries []Entry `protobuf:"bytes,7,rep,name=entries" json:"entries"` - Commit int64 `protobuf:"varint,8,req,name=commit" json:"commit"` - Snapshot Snapshot `protobuf:"bytes,9,req,name=snapshot" json:"snapshot"` - XXX_unrecognized []byte `json:"-"` + Type int64 `protobuf:"varint,1,req,name=type" json:"type"` + To int64 `protobuf:"varint,2,req,name=to" json:"to"` + From int64 `protobuf:"varint,3,req,name=from" json:"from"` + Term int64 `protobuf:"varint,4,req,name=term" json:"term"` + LogTerm int64 `protobuf:"varint,5,req,name=logTerm" json:"logTerm"` + Index int64 `protobuf:"varint,6,req,name=index" json:"index"` + Entries []Entry `protobuf:"bytes,7,rep,name=entries" json:"entries"` + Commit int64 `protobuf:"varint,8,req,name=commit" json:"commit"` + Snapshot Snapshot `protobuf:"bytes,9,req,name=snapshot" json:"snapshot"` + XXX_unrecognized []byte `json:"-"` } func (m *Message) Reset() { *m = Message{} } func (m *Message) String() string { return proto.CompactTextString(m) } func (*Message) ProtoMessage() {} +type State struct { + Term int64 `protobuf:"varint,1,req,name=term" json:"term"` + Vote int64 `protobuf:"varint,2,req,name=vote" json:"vote"` + Commit int64 `protobuf:"varint,3,req,name=commit" json:"commit"` + LastIndex int64 `protobuf:"varint,4,req,name=lastIndex" json:"lastIndex"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *State) Reset() { *m = State{} } +func (m *State) String() string { return proto.CompactTextString(m) } +func (*State) ProtoMessage() {} + func init() { } func (m *Entry) Unmarshal(data []byte) error { @@ -503,15 +515,117 @@ func (m *Message) Unmarshal(data []byte) error { } return nil } +func (m *State) Unmarshal(data []byte) error { + l := len(data) + index := 0 + for index < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + switch fieldNum { + case 1: + if wireType != 0 { + return code_google_com_p_gogoprotobuf_proto.ErrWrongType + } + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + m.Term |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 0 { + return code_google_com_p_gogoprotobuf_proto.ErrWrongType + } + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + m.Vote |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 0 { + return code_google_com_p_gogoprotobuf_proto.ErrWrongType + } + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + m.Commit |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 4: + if wireType != 0 { + return code_google_com_p_gogoprotobuf_proto.ErrWrongType + } + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + m.LastIndex |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + default: + var sizeOfWire int + for { + sizeOfWire++ + wire >>= 7 + if wire == 0 { + break + } + } + index -= sizeOfWire + skippy, err := code_google_com_p_gogoprotobuf_proto.Skip(data[index:]) + if err != nil { + return err + } + if (index + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, data[index:index+skippy]...) + index += skippy + } + } + return nil +} func (m *Entry) Size() (n int) { var l int _ = l - n += 1 + sovProtos(uint64(m.Type)) - n += 1 + sovProtos(uint64(m.Term)) - n += 1 + sovProtos(uint64(m.Index)) + n += 1 + sovRaft(uint64(m.Type)) + n += 1 + sovRaft(uint64(m.Term)) + n += 1 + sovRaft(uint64(m.Index)) l = len(m.Data) - n += 1 + l + sovProtos(uint64(l)) - n += 1 + sovProtos(uint64(m.Id)) + n += 1 + l + sovRaft(uint64(l)) + n += 1 + sovRaft(uint64(m.Id)) if m.XXX_unrecognized != nil { n += len(m.XXX_unrecognized) } @@ -521,14 +635,14 @@ func (m *Snapshot) Size() (n int) { var l int _ = l l = len(m.Data) - n += 1 + l + sovProtos(uint64(l)) + n += 1 + l + sovRaft(uint64(l)) if len(m.Nodes) > 0 { for _, e := range m.Nodes { - n += 1 + sovProtos(uint64(e)) + n += 1 + sovRaft(uint64(e)) } } - n += 1 + sovProtos(uint64(m.Index)) - n += 1 + sovProtos(uint64(m.Term)) + n += 1 + sovRaft(uint64(m.Index)) + n += 1 + sovRaft(uint64(m.Term)) if m.XXX_unrecognized != nil { n += len(m.XXX_unrecognized) } @@ -537,28 +651,40 @@ func (m *Snapshot) Size() (n int) { func (m *Message) Size() (n int) { var l int _ = l - n += 1 + sovProtos(uint64(m.Type)) - n += 1 + sovProtos(uint64(m.To)) - n += 1 + sovProtos(uint64(m.From)) - n += 1 + sovProtos(uint64(m.Term)) - n += 1 + sovProtos(uint64(m.LogTerm)) - n += 1 + sovProtos(uint64(m.Index)) + n += 1 + sovRaft(uint64(m.Type)) + n += 1 + sovRaft(uint64(m.To)) + n += 1 + sovRaft(uint64(m.From)) + n += 1 + sovRaft(uint64(m.Term)) + n += 1 + sovRaft(uint64(m.LogTerm)) + n += 1 + sovRaft(uint64(m.Index)) if len(m.Entries) > 0 { for _, e := range m.Entries { l = e.Size() - n += 1 + l + sovProtos(uint64(l)) + n += 1 + l + sovRaft(uint64(l)) } } - n += 1 + sovProtos(uint64(m.Commit)) + n += 1 + sovRaft(uint64(m.Commit)) l = m.Snapshot.Size() - n += 1 + l + sovProtos(uint64(l)) + n += 1 + l + sovRaft(uint64(l)) + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} +func (m *State) Size() (n int) { + var l int + _ = l + n += 1 + sovRaft(uint64(m.Term)) + n += 1 + sovRaft(uint64(m.Vote)) + n += 1 + sovRaft(uint64(m.Commit)) + n += 1 + sovRaft(uint64(m.LastIndex)) if m.XXX_unrecognized != nil { n += len(m.XXX_unrecognized) } return n } -func sovProtos(x uint64) (n int) { +func sovRaft(x uint64) (n int) { for { n++ x >>= 7 @@ -568,8 +694,8 @@ func sovProtos(x uint64) (n int) { } return n } -func sozProtos(x uint64) (n int) { - return sovProtos(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +func sozRaft(x uint64) (n int) { + return sovRaft(uint64((x << 1) ^ uint64((int64(x) >> 63)))) } func (m *Entry) Marshal() (data []byte, err error) { size := m.Size() @@ -588,20 +714,20 @@ func (m *Entry) MarshalTo(data []byte) (n int, err error) { _ = l data[i] = 0x8 i++ - i = encodeVarintProtos(data, i, uint64(m.Type)) + i = encodeVarintRaft(data, i, uint64(m.Type)) data[i] = 0x10 i++ - i = encodeVarintProtos(data, i, uint64(m.Term)) + i = encodeVarintRaft(data, i, uint64(m.Term)) data[i] = 0x18 i++ - i = encodeVarintProtos(data, i, uint64(m.Index)) + i = encodeVarintRaft(data, i, uint64(m.Index)) data[i] = 0x22 i++ - i = encodeVarintProtos(data, i, uint64(len(m.Data))) + i = encodeVarintRaft(data, i, uint64(len(m.Data))) i += copy(data[i:], m.Data) data[i] = 0x28 i++ - i = encodeVarintProtos(data, i, uint64(m.Id)) + i = encodeVarintRaft(data, i, uint64(m.Id)) if m.XXX_unrecognized != nil { i += copy(data[i:], m.XXX_unrecognized) } @@ -624,21 +750,21 @@ func (m *Snapshot) MarshalTo(data []byte) (n int, err error) { _ = l data[i] = 0xa i++ - i = encodeVarintProtos(data, i, uint64(len(m.Data))) + i = encodeVarintRaft(data, i, uint64(len(m.Data))) i += copy(data[i:], m.Data) if len(m.Nodes) > 0 { for _, num := range m.Nodes { data[i] = 0x10 i++ - i = encodeVarintProtos(data, i, uint64(num)) + i = encodeVarintRaft(data, i, uint64(num)) } } data[i] = 0x18 i++ - i = encodeVarintProtos(data, i, uint64(m.Index)) + i = encodeVarintRaft(data, i, uint64(m.Index)) data[i] = 0x20 i++ - i = encodeVarintProtos(data, i, uint64(m.Term)) + i = encodeVarintRaft(data, i, uint64(m.Term)) if m.XXX_unrecognized != nil { i += copy(data[i:], m.XXX_unrecognized) } @@ -661,27 +787,27 @@ func (m *Message) MarshalTo(data []byte) (n int, err error) { _ = l data[i] = 0x8 i++ - i = encodeVarintProtos(data, i, uint64(m.Type)) + i = encodeVarintRaft(data, i, uint64(m.Type)) data[i] = 0x10 i++ - i = encodeVarintProtos(data, i, uint64(m.To)) + i = encodeVarintRaft(data, i, uint64(m.To)) data[i] = 0x18 i++ - i = encodeVarintProtos(data, i, uint64(m.From)) + i = encodeVarintRaft(data, i, uint64(m.From)) data[i] = 0x20 i++ - i = encodeVarintProtos(data, i, uint64(m.Term)) + i = encodeVarintRaft(data, i, uint64(m.Term)) data[i] = 0x28 i++ - i = encodeVarintProtos(data, i, uint64(m.LogTerm)) + i = encodeVarintRaft(data, i, uint64(m.LogTerm)) data[i] = 0x30 i++ - i = encodeVarintProtos(data, i, uint64(m.Index)) + i = encodeVarintRaft(data, i, uint64(m.Index)) if len(m.Entries) > 0 { for _, msg := range m.Entries { data[i] = 0x3a i++ - i = encodeVarintProtos(data, i, uint64(msg.Size())) + i = encodeVarintRaft(data, i, uint64(msg.Size())) n, err := msg.MarshalTo(data[i:]) if err != nil { return 0, err @@ -691,10 +817,10 @@ func (m *Message) MarshalTo(data []byte) (n int, err error) { } data[i] = 0x40 i++ - i = encodeVarintProtos(data, i, uint64(m.Commit)) + i = encodeVarintRaft(data, i, uint64(m.Commit)) data[i] = 0x4a i++ - i = encodeVarintProtos(data, i, uint64(m.Snapshot.Size())) + i = encodeVarintRaft(data, i, uint64(m.Snapshot.Size())) n1, err := m.Snapshot.MarshalTo(data[i:]) if err != nil { return 0, err @@ -705,7 +831,39 @@ func (m *Message) MarshalTo(data []byte) (n int, err error) { } return i, nil } -func encodeFixed64Protos(data []byte, offset int, v uint64) int { +func (m *State) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *State) MarshalTo(data []byte) (n int, err error) { + var i int + _ = i + var l int + _ = l + data[i] = 0x8 + i++ + i = encodeVarintRaft(data, i, uint64(m.Term)) + data[i] = 0x10 + i++ + i = encodeVarintRaft(data, i, uint64(m.Vote)) + data[i] = 0x18 + i++ + i = encodeVarintRaft(data, i, uint64(m.Commit)) + data[i] = 0x20 + i++ + i = encodeVarintRaft(data, i, uint64(m.LastIndex)) + if m.XXX_unrecognized != nil { + i += copy(data[i:], m.XXX_unrecognized) + } + return i, nil +} +func encodeFixed64Raft(data []byte, offset int, v uint64) int { data[offset] = uint8(v) data[offset+1] = uint8(v >> 8) data[offset+2] = uint8(v >> 16) @@ -716,14 +874,14 @@ func encodeFixed64Protos(data []byte, offset int, v uint64) int { data[offset+7] = uint8(v >> 56) return offset + 8 } -func encodeFixed32Protos(data []byte, offset int, v uint32) int { +func encodeFixed32Raft(data []byte, offset int, v uint32) int { data[offset] = uint8(v) data[offset+1] = uint8(v >> 8) data[offset+2] = uint8(v >> 16) data[offset+3] = uint8(v >> 24) return offset + 4 } -func encodeVarintProtos(data []byte, offset int, v uint64) int { +func encodeVarintRaft(data []byte, offset int, v uint64) int { for v >= 1<<7 { data[offset] = uint8(v&0x7f | 0x80) v >>= 7 diff --git a/raft/protos.proto b/raft/raftpb/raft.proto similarity index 83% rename from raft/protos.proto rename to raft/raftpb/raft.proto index 67e369db7..eaf4ca34c 100644 --- a/raft/protos.proto +++ b/raft/raftpb/raft.proto @@ -1,4 +1,4 @@ -package raft; +package raftpb; import "code.google.com/p/gogoprotobuf/gogoproto/gogo.proto"; @@ -33,3 +33,10 @@ message Message { required int64 commit = 8 [(gogoproto.nullable) = false]; required Snapshot snapshot = 9 [(gogoproto.nullable) = false]; } + +message State { + required int64 term = 1 [(gogoproto.nullable) = false]; + required int64 vote = 2 [(gogoproto.nullable) = false]; + required int64 commit = 3 [(gogoproto.nullable) = false]; + required int64 lastIndex = 4 [(gogoproto.nullable) = false]; +} diff --git a/raft/state.pb.go b/raft/state.pb.go deleted file mode 100644 index 5824dc7dd..000000000 --- a/raft/state.pb.go +++ /dev/null @@ -1,221 +0,0 @@ -// Code generated by protoc-gen-gogo. -// source: state.proto -// DO NOT EDIT! - -package raft - -import proto "code.google.com/p/gogoprotobuf/proto" -import json "encoding/json" -import math "math" - -// discarding unused import gogoproto "code.google.com/p/gogoprotobuf/gogoproto/gogo.pb" - -import io1 "io" -import code_google_com_p_gogoprotobuf_proto1 "code.google.com/p/gogoprotobuf/proto" - -// Reference proto, json, and math imports to suppress error if they are not otherwise used. -var _ = proto.Marshal -var _ = &json.SyntaxError{} -var _ = math.Inf - -type State struct { - Term int64 `protobuf:"varint,1,req,name=term" json:"term"` - Vote int64 `protobuf:"varint,2,req,name=vote" json:"vote"` - Commit int64 `protobuf:"varint,3,req,name=commit" json:"commit"` - LastIndex int64 `protobuf:"varint,4,req,name=lastIndex" json:"lastIndex"` - XXX_unrecognized []byte `json:"-"` -} - -func (m *State) Reset() { *m = State{} } -func (m *State) String() string { return proto.CompactTextString(m) } -func (*State) ProtoMessage() {} - -func init() { -} -func (m *State) Unmarshal(data []byte) error { - l := len(data) - index := 0 - for index < l { - var wire uint64 - for shift := uint(0); ; shift += 7 { - if index >= l { - return io1.ErrUnexpectedEOF - } - b := data[index] - index++ - wire |= (uint64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - switch fieldNum { - case 1: - if wireType != 0 { - return code_google_com_p_gogoprotobuf_proto1.ErrWrongType - } - for shift := uint(0); ; shift += 7 { - if index >= l { - return io1.ErrUnexpectedEOF - } - b := data[index] - index++ - m.Term |= (int64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - case 2: - if wireType != 0 { - return code_google_com_p_gogoprotobuf_proto1.ErrWrongType - } - for shift := uint(0); ; shift += 7 { - if index >= l { - return io1.ErrUnexpectedEOF - } - b := data[index] - index++ - m.Vote |= (int64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - case 3: - if wireType != 0 { - return code_google_com_p_gogoprotobuf_proto1.ErrWrongType - } - for shift := uint(0); ; shift += 7 { - if index >= l { - return io1.ErrUnexpectedEOF - } - b := data[index] - index++ - m.Commit |= (int64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - case 4: - if wireType != 0 { - return code_google_com_p_gogoprotobuf_proto1.ErrWrongType - } - for shift := uint(0); ; shift += 7 { - if index >= l { - return io1.ErrUnexpectedEOF - } - b := data[index] - index++ - m.LastIndex |= (int64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - default: - var sizeOfWire int - for { - sizeOfWire++ - wire >>= 7 - if wire == 0 { - break - } - } - index -= sizeOfWire - skippy, err := code_google_com_p_gogoprotobuf_proto1.Skip(data[index:]) - if err != nil { - return err - } - if (index + skippy) > l { - return io1.ErrUnexpectedEOF - } - m.XXX_unrecognized = append(m.XXX_unrecognized, data[index:index+skippy]...) - index += skippy - } - } - return nil -} -func (m *State) Size() (n int) { - var l int - _ = l - n += 1 + sovState(uint64(m.Term)) - n += 1 + sovState(uint64(m.Vote)) - n += 1 + sovState(uint64(m.Commit)) - n += 1 + sovState(uint64(m.LastIndex)) - if m.XXX_unrecognized != nil { - n += len(m.XXX_unrecognized) - } - return n -} - -func sovState(x uint64) (n int) { - for { - n++ - x >>= 7 - if x == 0 { - break - } - } - return n -} -func sozState(x uint64) (n int) { - return sovState(uint64((x << 1) ^ uint64((int64(x) >> 63)))) -} -func (m *State) Marshal() (data []byte, err error) { - size := m.Size() - data = make([]byte, size) - n, err := m.MarshalTo(data) - if err != nil { - return nil, err - } - return data[:n], nil -} - -func (m *State) MarshalTo(data []byte) (n int, err error) { - var i int - _ = i - var l int - _ = l - data[i] = 0x8 - i++ - i = encodeVarintState(data, i, uint64(m.Term)) - data[i] = 0x10 - i++ - i = encodeVarintState(data, i, uint64(m.Vote)) - data[i] = 0x18 - i++ - i = encodeVarintState(data, i, uint64(m.Commit)) - data[i] = 0x20 - i++ - i = encodeVarintState(data, i, uint64(m.LastIndex)) - if m.XXX_unrecognized != nil { - i += copy(data[i:], m.XXX_unrecognized) - } - return i, nil -} -func encodeFixed64State(data []byte, offset int, v uint64) int { - data[offset] = uint8(v) - data[offset+1] = uint8(v >> 8) - data[offset+2] = uint8(v >> 16) - data[offset+3] = uint8(v >> 24) - data[offset+4] = uint8(v >> 32) - data[offset+5] = uint8(v >> 40) - data[offset+6] = uint8(v >> 48) - data[offset+7] = uint8(v >> 56) - return offset + 8 -} -func encodeFixed32State(data []byte, offset int, v uint32) int { - data[offset] = uint8(v) - data[offset+1] = uint8(v >> 8) - data[offset+2] = uint8(v >> 16) - data[offset+3] = uint8(v >> 24) - return offset + 4 -} -func encodeVarintState(data []byte, offset int, v uint64) int { - for v >= 1<<7 { - data[offset] = uint8(v&0x7f | 0x80) - v >>= 7 - offset++ - } - data[offset] = uint8(v) - return offset + 1 -} diff --git a/raft/state.proto b/raft/state.proto deleted file mode 100644 index bfdfcab1d..000000000 --- a/raft/state.proto +++ /dev/null @@ -1,15 +0,0 @@ -package raft; - -import "code.google.com/p/gogoprotobuf/gogoproto/gogo.proto"; - -option (gogoproto.marshaler_all) = true; -option (gogoproto.sizer_all) = true; -option (gogoproto.unmarshaler_all) = true; -option (gogoproto.goproto_getters_all) = false; - -message State { - required int64 term = 1 [(gogoproto.nullable) = false]; - required int64 vote = 2 [(gogoproto.nullable) = false]; - required int64 commit = 3 [(gogoproto.nullable) = false]; - required int64 lastIndex = 4 [(gogoproto.nullable) = false]; -}