raft: move protobufs into raftpb

This commit is contained in:
Blake Mizerany 2014-08-27 18:53:18 -07:00 committed by Yicheng Qin
parent 59115b85f7
commit e8e588c67b
13 changed files with 449 additions and 503 deletions

View File

@ -13,7 +13,7 @@ import (
"code.google.com/p/go.net/context" "code.google.com/p/go.net/context"
"github.com/coreos/etcd/elog" "github.com/coreos/etcd/elog"
etcdserver "github.com/coreos/etcd/etcdserver2" etcdserver "github.com/coreos/etcd/etcdserver2"
"github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/store" "github.com/coreos/etcd/store"
) )
@ -66,7 +66,7 @@ func (h Handler) serveRaft(ctx context.Context, w http.ResponseWriter, r *http.R
if err != nil { if err != nil {
elog.TODO() elog.TODO()
} }
var m raft.Message var m raftpb.Message
if err := m.Unmarshal(b); err != nil { if err := m.Unmarshal(b); err != nil {
elog.TODO() elog.TODO()
} }

View File

@ -7,13 +7,14 @@ import (
"code.google.com/p/go.net/context" "code.google.com/p/go.net/context"
"github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/store" "github.com/coreos/etcd/store"
"github.com/coreos/etcd/wait" "github.com/coreos/etcd/wait"
) )
var ErrUnknownMethod = errors.New("etcdserver: unknown method") var ErrUnknownMethod = errors.New("etcdserver: unknown method")
type SendFunc func(m []raft.Message) type SendFunc func(m []raftpb.Message)
type Response struct { type Response struct {
// The last seen term raft was at when this request was built. // The last seen term raft was at when this request was built.
@ -44,7 +45,7 @@ type Server struct {
// Save specifies the save function for saving ents to stable storage. // Save specifies the save function for saving ents to stable storage.
// Save MUST block until st and ents are on stable storage. If Send is // Save MUST block until st and ents are on stable storage. If Send is
// nil, Server will panic. // nil, Server will panic.
Save func(st raft.State, ents []raft.Entry) Save func(st raftpb.State, ents []raftpb.Entry)
} }
func (s *Server) init() { s.w = wait.New() } func (s *Server) init() { s.w = wait.New() }
@ -116,7 +117,7 @@ func (s *Server) Do(ctx context.Context, r Request) (Response, error) {
} }
// apply interprets r as a call to store.X and returns an Response interpreted from store.Event // apply interprets r as a call to store.X and returns an Response interpreted from store.Event
func (s *Server) apply(ctx context.Context, e raft.Entry) (*store.Event, error) { func (s *Server) apply(ctx context.Context, e raftpb.Entry) (*store.Event, error) {
var r Request var r Request
if err := r.Unmarshal(e.Data); err != nil { if err := r.Unmarshal(e.Data); err != nil {
return nil, err return nil, err

View File

@ -1,11 +1,15 @@
package raft package raft
import "code.google.com/p/go.net/context" import (
"code.google.com/p/go.net/context"
func applyToStore(ents []Entry) {} pb "github.com/coreos/etcd/raft/raftpb"
func sendMessages(msgs []Message) {} )
func saveStateToDisk(st State) {}
func saveToDisk(ents []Entry) {} func applyToStore(ents []pb.Entry) {}
func sendMessages(msgs []pb.Message) {}
func saveStateToDisk(st pb.State) {}
func saveToDisk(ents []pb.Entry) {}
func Example_Node() { func Example_Node() {
n := Start(context.Background(), 0, nil) n := Start(context.Background(), 0, nil)
@ -13,11 +17,11 @@ func Example_Node() {
// stuff to n happens in other goroutines // stuff to n happens in other goroutines
// the last known state // the last known state
var prev State var prev pb.State
for { for {
// ReadState blocks until there is new state ready. // ReadState blocks until there is new state ready.
rd := <-n.Ready() rd := <-n.Ready()
if !prev.Equal(rd.State) { if !isStateEqual(prev, rd.State) {
saveStateToDisk(rd.State) saveStateToDisk(rd.State)
prev = rd.State prev = rd.State
} }

View File

@ -1,6 +1,10 @@
package raft package raft
import "fmt" import (
"fmt"
pb "github.com/coreos/etcd/raft/raftpb"
)
const ( const (
Normal int64 = iota Normal int64 = iota
@ -14,18 +18,18 @@ const (
defaultCompactThreshold = 10000 defaultCompactThreshold = 10000
) )
func (e *Entry) isConfig() bool { func isConfig(e pb.Entry) bool {
return e.Type == AddNode || e.Type == RemoveNode return e.Type == AddNode || e.Type == RemoveNode
} }
type raftLog struct { type raftLog struct {
ents []Entry ents []pb.Entry
unstable int64 unstable int64
committed int64 committed int64
applied int64 applied int64
offset int64 offset int64
snapshot Snapshot snapshot pb.Snapshot
unstableSnapshot Snapshot unstableSnapshot pb.Snapshot
// want a compact after the number of entries exceeds the threshold // want a compact after the number of entries exceeds the threshold
// TODO(xiangli) size might be a better criteria // TODO(xiangli) size might be a better criteria
@ -34,7 +38,7 @@ type raftLog struct {
func newLog() *raftLog { func newLog() *raftLog {
return &raftLog{ return &raftLog{
ents: make([]Entry, 1), ents: make([]pb.Entry, 1),
unstable: 1, unstable: 1,
committed: 0, committed: 0,
applied: 0, applied: 0,
@ -50,7 +54,7 @@ func (l *raftLog) String() string {
return fmt.Sprintf("offset=%d committed=%d applied=%d len(ents)=%d", l.offset, l.committed, l.applied, len(l.ents)) return fmt.Sprintf("offset=%d committed=%d applied=%d len(ents)=%d", l.offset, l.committed, l.applied, len(l.ents))
} }
func (l *raftLog) maybeAppend(index, logTerm, committed int64, ents ...Entry) bool { func (l *raftLog) maybeAppend(index, logTerm, committed int64, ents ...pb.Entry) bool {
if l.matchTerm(index, logTerm) { if l.matchTerm(index, logTerm) {
from := index + 1 from := index + 1
ci := l.findConflict(from, ents) ci := l.findConflict(from, ents)
@ -69,13 +73,13 @@ func (l *raftLog) maybeAppend(index, logTerm, committed int64, ents ...Entry) bo
return false return false
} }
func (l *raftLog) append(after int64, ents ...Entry) int64 { func (l *raftLog) append(after int64, ents ...pb.Entry) int64 {
l.ents = append(l.slice(l.offset, after+1), ents...) l.ents = append(l.slice(l.offset, after+1), ents...)
l.unstable = min(l.unstable, after+1) l.unstable = min(l.unstable, after+1)
return l.lastIndex() return l.lastIndex()
} }
func (l *raftLog) findConflict(from int64, ents []Entry) int64 { func (l *raftLog) findConflict(from int64, ents []pb.Entry) int64 {
for i, ne := range ents { for i, ne := range ents {
if oe := l.at(from + int64(i)); oe == nil || oe.Term != ne.Term { if oe := l.at(from + int64(i)); oe == nil || oe.Term != ne.Term {
return from + int64(i) return from + int64(i)
@ -84,12 +88,12 @@ func (l *raftLog) findConflict(from int64, ents []Entry) int64 {
return -1 return -1
} }
func (l *raftLog) unstableEnts() []Entry { func (l *raftLog) unstableEnts() []pb.Entry {
ents := l.entries(l.unstable) ents := l.entries(l.unstable)
if ents == nil { if ents == nil {
return nil return nil
} }
cpy := make([]Entry, len(ents)) cpy := make([]pb.Entry, len(ents))
copy(cpy, ents) copy(cpy, ents)
return cpy return cpy
} }
@ -100,13 +104,13 @@ func (l *raftLog) resetUnstable() {
// nextEnts returns all the available entries for execution. // nextEnts returns all the available entries for execution.
// all the returned entries will be marked as applied. // all the returned entries will be marked as applied.
func (l *raftLog) nextEnts() (ents []Entry) { func (l *raftLog) nextEnts() (ents []pb.Entry) {
if l.committed > l.applied { if l.committed > l.applied {
ents := l.slice(l.applied+1, l.committed+1) ents := l.slice(l.applied+1, l.committed+1)
if ents == nil { if ents == nil {
return nil return nil
} }
cpy := make([]Entry, len(ents)) cpy := make([]pb.Entry, len(ents))
copy(cpy, ents) copy(cpy, ents)
return cpy return cpy
} }
@ -130,7 +134,7 @@ func (l *raftLog) term(i int64) int64 {
return -1 return -1
} }
func (l *raftLog) entries(i int64) []Entry { func (l *raftLog) entries(i int64) []pb.Entry {
// never send out the first entry // never send out the first entry
// first entry is only used for matching // first entry is only used for matching
// prevLogTerm // prevLogTerm
@ -176,15 +180,15 @@ func (l *raftLog) compact(i int64) int64 {
} }
func (l *raftLog) snap(d []byte, index, term int64, nodes []int64) { func (l *raftLog) snap(d []byte, index, term int64, nodes []int64) {
l.snapshot = Snapshot{d, nodes, index, term, nil} l.snapshot = pb.Snapshot{d, nodes, index, term, nil}
} }
func (l *raftLog) shouldCompact() bool { func (l *raftLog) shouldCompact() bool {
return (l.applied - l.offset) > l.compactThreshold return (l.applied - l.offset) > l.compactThreshold
} }
func (l *raftLog) restore(s Snapshot) { func (l *raftLog) restore(s pb.Snapshot) {
l.ents = []Entry{{Term: s.Term}} l.ents = []pb.Entry{{Term: s.Term}}
l.unstable = s.Index + 1 l.unstable = s.Index + 1
l.committed = s.Index l.committed = s.Index
l.applied = s.Index l.applied = s.Index
@ -192,7 +196,7 @@ func (l *raftLog) restore(s Snapshot) {
l.snapshot = s l.snapshot = s
} }
func (l *raftLog) at(i int64) *Entry { func (l *raftLog) at(i int64) *pb.Entry {
if l.isOutOfBounds(i) { if l.isOutOfBounds(i) {
return nil return nil
} }
@ -200,7 +204,7 @@ func (l *raftLog) at(i int64) *Entry {
} }
// slice get a slice of log entries from lo through hi-1, inclusive. // slice get a slice of log entries from lo through hi-1, inclusive.
func (l *raftLog) slice(lo int64, hi int64) []Entry { func (l *raftLog) slice(lo int64, hi int64) []pb.Entry {
if lo >= hi { if lo >= hi {
return nil return nil
} }

View File

@ -3,6 +3,8 @@ package raft
import ( import (
"reflect" "reflect"
"testing" "testing"
pb "github.com/coreos/etcd/raft/raftpb"
) )
// TestAppend ensures: // TestAppend ensures:
@ -11,43 +13,43 @@ import (
// follow it // follow it
// 2.Append any new entries not already in the log // 2.Append any new entries not already in the log
func TestAppend(t *testing.T) { func TestAppend(t *testing.T) {
previousEnts := []Entry{{Term: 1}, {Term: 2}} previousEnts := []pb.Entry{{Term: 1}, {Term: 2}}
previousUnstable := int64(3) previousUnstable := int64(3)
tests := []struct { tests := []struct {
after int64 after int64
ents []Entry ents []pb.Entry
windex int64 windex int64
wents []Entry wents []pb.Entry
wunstable int64 wunstable int64
}{ }{
{ {
2, 2,
[]Entry{}, []pb.Entry{},
2, 2,
[]Entry{{Term: 1}, {Term: 2}}, []pb.Entry{{Term: 1}, {Term: 2}},
3, 3,
}, },
{ {
2, 2,
[]Entry{{Term: 2}}, []pb.Entry{{Term: 2}},
3, 3,
[]Entry{{Term: 1}, {Term: 2}, {Term: 2}}, []pb.Entry{{Term: 1}, {Term: 2}, {Term: 2}},
3, 3,
}, },
// conflicts with index 1 // conflicts with index 1
{ {
0, 0,
[]Entry{{Term: 2}}, []pb.Entry{{Term: 2}},
1, 1,
[]Entry{{Term: 2}}, []pb.Entry{{Term: 2}},
1, 1,
}, },
// conflicts with index 2 // conflicts with index 2
{ {
1, 1,
[]Entry{{Term: 3}, {Term: 3}}, []pb.Entry{{Term: 3}, {Term: 3}},
3, 3,
[]Entry{{Term: 1}, {Term: 3}, {Term: 3}}, []pb.Entry{{Term: 1}, {Term: 3}, {Term: 3}},
2, 2,
}, },
} }
@ -77,7 +79,7 @@ func TestCompactionSideEffects(t *testing.T) {
raftLog := newLog() raftLog := newLog()
for i = 0; i < lastIndex; i++ { for i = 0; i < lastIndex; i++ {
raftLog.append(int64(i), Entry{Term: int64(i + 1), Index: int64(i + 1)}) raftLog.append(int64(i), pb.Entry{Term: int64(i + 1), Index: int64(i + 1)})
} }
raftLog.compact(500) raftLog.compact(500)
@ -107,7 +109,7 @@ func TestCompactionSideEffects(t *testing.T) {
} }
prev := raftLog.lastIndex() prev := raftLog.lastIndex()
raftLog.append(raftLog.lastIndex(), Entry{Term: raftLog.lastIndex() + 1}) raftLog.append(raftLog.lastIndex(), pb.Entry{Term: raftLog.lastIndex() + 1})
if raftLog.lastIndex() != prev+1 { if raftLog.lastIndex() != prev+1 {
t.Errorf("lastIndex = %d, want = %d", raftLog.lastIndex(), prev+1) t.Errorf("lastIndex = %d, want = %d", raftLog.lastIndex(), prev+1)
} }
@ -119,10 +121,10 @@ func TestCompactionSideEffects(t *testing.T) {
} }
func TestUnstableEnts(t *testing.T) { func TestUnstableEnts(t *testing.T) {
previousEnts := []Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}} previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}}
tests := []struct { tests := []struct {
unstable int64 unstable int64
wents []Entry wents []pb.Entry
wunstable int64 wunstable int64
}{ }{
{3, nil, 3}, {3, nil, 3},
@ -171,7 +173,7 @@ func TestCompaction(t *testing.T) {
raftLog := newLog() raftLog := newLog()
for i := 0; i < tt.app; i++ { for i := 0; i < tt.app; i++ {
raftLog.append(int64(i), Entry{}) raftLog.append(int64(i), pb.Entry{})
} }
for j := 0; j < len(tt.compact); j++ { for j := 0; j < len(tt.compact); j++ {
@ -188,12 +190,12 @@ func TestLogRestore(t *testing.T) {
var i int64 var i int64
raftLog := newLog() raftLog := newLog()
for i = 0; i < 100; i++ { for i = 0; i < 100; i++ {
raftLog.append(i, Entry{Term: i + 1}) raftLog.append(i, pb.Entry{Term: i + 1})
} }
index := int64(1000) index := int64(1000)
term := int64(1000) term := int64(1000)
raftLog.restore(Snapshot{Index: index, Term: term}) raftLog.restore(pb.Snapshot{Index: index, Term: term})
// only has the guard entry // only has the guard entry
if len(raftLog.ents) != 1 { if len(raftLog.ents) != 1 {
@ -219,7 +221,7 @@ func TestLogRestore(t *testing.T) {
func TestIsOutOfBounds(t *testing.T) { func TestIsOutOfBounds(t *testing.T) {
offset := int64(100) offset := int64(100)
num := int64(100) num := int64(100)
l := &raftLog{offset: offset, ents: make([]Entry, num)} l := &raftLog{offset: offset, ents: make([]pb.Entry, num)}
tests := []struct { tests := []struct {
index int64 index int64
@ -247,17 +249,17 @@ func TestAt(t *testing.T) {
l := &raftLog{offset: offset} l := &raftLog{offset: offset}
for i = 0; i < num; i++ { for i = 0; i < num; i++ {
l.ents = append(l.ents, Entry{Term: i}) l.ents = append(l.ents, pb.Entry{Term: i})
} }
tests := []struct { tests := []struct {
index int64 index int64
w *Entry w *pb.Entry
}{ }{
{offset - 1, nil}, {offset - 1, nil},
{offset, &Entry{Term: 0}}, {offset, &pb.Entry{Term: 0}},
{offset + num/2, &Entry{Term: num / 2}}, {offset + num/2, &pb.Entry{Term: num / 2}},
{offset + num - 1, &Entry{Term: num - 1}}, {offset + num - 1, &pb.Entry{Term: num - 1}},
{offset + num, nil}, {offset + num, nil},
} }
@ -276,18 +278,18 @@ func TestSlice(t *testing.T) {
l := &raftLog{offset: offset} l := &raftLog{offset: offset}
for i = 0; i < num; i++ { for i = 0; i < num; i++ {
l.ents = append(l.ents, Entry{Term: i}) l.ents = append(l.ents, pb.Entry{Term: i})
} }
tests := []struct { tests := []struct {
from int64 from int64
to int64 to int64
w []Entry w []pb.Entry
}{ }{
{offset - 1, offset + 1, nil}, {offset - 1, offset + 1, nil},
{offset, offset + 1, []Entry{{Term: 0}}}, {offset, offset + 1, []pb.Entry{{Term: 0}}},
{offset + num/2, offset + num/2 + 1, []Entry{{Term: num / 2}}}, {offset + num/2, offset + num/2 + 1, []pb.Entry{{Term: num / 2}}},
{offset + num - 1, offset + num, []Entry{{Term: num - 1}}}, {offset + num - 1, offset + num, []pb.Entry{{Term: num - 1}}},
{offset + num, offset + num + 1, nil}, {offset + num, offset + num + 1, nil},
{offset + num/2, offset + num/2, nil}, {offset + num/2, offset + num/2, nil},

View File

@ -3,38 +3,40 @@ package raft
import ( import (
"code.google.com/p/go.net/context" "code.google.com/p/go.net/context"
pb "github.com/coreos/etcd/raft/raftpb"
) )
type Ready struct { type Ready struct {
// The current state of a Node // The current state of a Node
State pb.State
// Entries specifies entries to be saved to stable storage BEFORE // Entries specifies entries to be saved to stable storage BEFORE
// Messages are sent. // Messages are sent.
Entries []Entry Entries []pb.Entry
// CommittedEntries specifies entries to be committed to a // CommittedEntries specifies entries to be committed to a
// store/state-machine. These have previously been committed to stable // store/state-machine. These have previously been committed to stable
// store. // store.
CommittedEntries []Entry CommittedEntries []pb.Entry
// Messages specifies outbound messages to be sent AFTER Entries are // Messages specifies outbound messages to be sent AFTER Entries are
// committed to stable storage. // committed to stable storage.
Messages []Message Messages []pb.Message
} }
func (a State) Equal(b State) bool { func isStateEqual(a, b pb.State) bool {
return a.Term == b.Term && a.Vote == b.Vote && a.LastIndex == b.LastIndex return a.Term == b.Term && a.Vote == b.Vote && a.LastIndex == b.LastIndex
} }
func (rd Ready) containsUpdates(prev Ready) bool { func (rd Ready) containsUpdates(prev Ready) bool {
return !prev.State.Equal(rd.State) || len(rd.Entries) > 0 || len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0 return !isStateEqual(prev.State, rd.State) || len(rd.Entries) > 0 || len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0
} }
type Node struct { type Node struct {
ctx context.Context ctx context.Context
propc chan Message propc chan pb.Message
recvc chan Message recvc chan pb.Message
readyc chan Ready readyc chan Ready
tickc chan struct{} tickc chan struct{}
} }
@ -42,8 +44,8 @@ type Node struct {
func Start(ctx context.Context, id int64, peers []int64) Node { func Start(ctx context.Context, id int64, peers []int64) Node {
n := Node{ n := Node{
ctx: ctx, ctx: ctx,
propc: make(chan Message), propc: make(chan pb.Message),
recvc: make(chan Message), recvc: make(chan pb.Message),
readyc: make(chan Ready), readyc: make(chan Ready),
tickc: make(chan struct{}), tickc: make(chan struct{}),
} }
@ -109,12 +111,12 @@ func (n *Node) Tick() error {
// Propose proposes data be appended to the log. // Propose proposes data be appended to the log.
func (n *Node) Propose(ctx context.Context, id int64, data []byte) error { func (n *Node) Propose(ctx context.Context, id int64, data []byte) error {
return n.Step(ctx, Message{Type: msgProp, Entries: []Entry{{Id: id, Data: data}}}) return n.Step(ctx, pb.Message{Type: msgProp, Entries: []pb.Entry{{Id: id, Data: data}}})
} }
// Step advances the state machine using msgs. The ctx.Err() will be returned, // Step advances the state machine using msgs. The ctx.Err() will be returned,
// if any. // if any.
func (n *Node) Step(ctx context.Context, m Message) error { func (n *Node) Step(ctx context.Context, m pb.Message) error {
ch := n.recvc ch := n.recvc
if m.Type == msgProp { if m.Type == msgProp {
ch = n.propc ch = n.propc
@ -135,7 +137,7 @@ func (n *Node) Ready() <-chan Ready {
return n.readyc return n.readyc
} }
type byMsgType []Message type byMsgType []pb.Message
func (msgs byMsgType) Len() int { return len(msgs) } func (msgs byMsgType) Len() int { return len(msgs) }
func (msgs byMsgType) Less(i, j int) bool { return msgs[i].Type == msgProp } func (msgs byMsgType) Less(i, j int) bool { return msgs[i].Type == msgProp }

View File

@ -4,6 +4,8 @@ import (
"errors" "errors"
"fmt" "fmt"
"sort" "sort"
pb "github.com/coreos/etcd/raft/raftpb"
) )
const none = -1 const none = -1
@ -91,7 +93,7 @@ func (p int64Slice) Less(i, j int) bool { return p[i] < p[j] }
func (p int64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } func (p int64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
type raft struct { type raft struct {
State pb.State
id int64 id int64
@ -107,7 +109,7 @@ type raft struct {
votes map[int64]bool votes map[int64]bool
msgs []Message msgs []pb.Message
// the leader id // the leader id
lead int64 lead int64
@ -161,7 +163,7 @@ func (r *raft) poll(id int64, v bool) (granted int) {
} }
// send persists state to stable storage and then sends to its mailbox. // send persists state to stable storage and then sends to its mailbox.
func (r *raft) send(m Message) { func (r *raft) send(m pb.Message) {
m.From = r.id m.From = r.id
m.Term = r.Term m.Term = r.Term
r.msgs = append(r.msgs, m) r.msgs = append(r.msgs, m)
@ -170,7 +172,7 @@ func (r *raft) send(m Message) {
// sendAppend sends RRPC, with entries to the given peer. // sendAppend sends RRPC, with entries to the given peer.
func (r *raft) sendAppend(to int64) { func (r *raft) sendAppend(to int64) {
pr := r.prs[to] pr := r.prs[to]
m := Message{} m := pb.Message{}
m.To = to m.To = to
m.Index = pr.next - 1 m.Index = pr.next - 1
if r.needSnapshot(m.Index) { if r.needSnapshot(m.Index) {
@ -189,7 +191,7 @@ func (r *raft) sendAppend(to int64) {
func (r *raft) sendHeartbeat(to int64) { func (r *raft) sendHeartbeat(to int64) {
pr := r.prs[to] pr := r.prs[to]
index := max(pr.next-1, r.raftLog.lastIndex()) index := max(pr.next-1, r.raftLog.lastIndex())
m := Message{ m := pb.Message{
To: to, To: to,
Type: msgApp, Type: msgApp,
Index: index, Index: index,
@ -248,7 +250,7 @@ func (r *raft) q() int {
return len(r.prs)/2 + 1 return len(r.prs)/2 + 1
} }
func (r *raft) appendEntry(e Entry) { func (r *raft) appendEntry(e pb.Entry) {
e.Term = r.Term e.Term = r.Term
e.Index = r.raftLog.lastIndex() + 1 e.Index = r.raftLog.lastIndex() + 1
r.LastIndex = r.raftLog.append(r.raftLog.lastIndex(), e) r.LastIndex = r.raftLog.append(r.raftLog.lastIndex(), e)
@ -283,22 +285,22 @@ func (r *raft) becomeLeader() {
r.state = stateLeader r.state = stateLeader
for _, e := range r.raftLog.entries(r.raftLog.committed + 1) { for _, e := range r.raftLog.entries(r.raftLog.committed + 1) {
if e.isConfig() { if isConfig(e) {
r.configuring = true r.configuring = true
} }
} }
r.appendEntry(Entry{Type: Normal, Data: nil}) r.appendEntry(pb.Entry{Type: Normal, Data: nil})
} }
func (r *raft) ReadMessages() []Message { func (r *raft) ReadMessages() []pb.Message {
msgs := r.msgs msgs := r.msgs
r.msgs = make([]Message, 0) r.msgs = make([]pb.Message, 0)
return msgs return msgs
} }
func (r *raft) Step(m Message) error { func (r *raft) Step(m pb.Message) error {
// TODO(bmizerany): this likely allocs - prevent that. // TODO(bmizerany): this likely allocs - prevent that.
defer func() { r.Commit = r.raftLog.committed }() defer func() { r.Commit = r.raftLog.committed }()
@ -312,7 +314,7 @@ func (r *raft) Step(m Message) error {
continue continue
} }
lasti := r.raftLog.lastIndex() lasti := r.raftLog.lastIndex()
r.send(Message{To: i, Type: msgVote, Index: lasti, LogTerm: r.raftLog.term(lasti)}) r.send(pb.Message{To: i, Type: msgVote, Index: lasti, LogTerm: r.raftLog.term(lasti)})
} }
} }
@ -333,20 +335,20 @@ func (r *raft) Step(m Message) error {
return nil return nil
} }
func (r *raft) handleAppendEntries(m Message) { func (r *raft) handleAppendEntries(m pb.Message) {
if r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...) { if r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...) {
r.LastIndex = r.raftLog.lastIndex() r.LastIndex = r.raftLog.lastIndex()
r.send(Message{To: m.From, Type: msgAppResp, Index: r.raftLog.lastIndex()}) r.send(pb.Message{To: m.From, Type: msgAppResp, Index: r.raftLog.lastIndex()})
} else { } else {
r.send(Message{To: m.From, Type: msgAppResp, Index: -1}) r.send(pb.Message{To: m.From, Type: msgAppResp, Index: -1})
} }
} }
func (r *raft) handleSnapshot(m Message) { func (r *raft) handleSnapshot(m pb.Message) {
if r.restore(m.Snapshot) { if r.restore(m.Snapshot) {
r.send(Message{To: m.From, Type: msgAppResp, Index: r.raftLog.lastIndex()}) r.send(pb.Message{To: m.From, Type: msgAppResp, Index: r.raftLog.lastIndex()})
} else { } else {
r.send(Message{To: m.From, Type: msgAppResp, Index: r.raftLog.committed}) r.send(pb.Message{To: m.From, Type: msgAppResp, Index: r.raftLog.committed})
} }
} }
@ -363,9 +365,9 @@ func (r *raft) removeNode(id int64) {
r.configuring = false r.configuring = false
} }
type stepFunc func(r *raft, m Message) type stepFunc func(r *raft, m pb.Message)
func stepLeader(r *raft, m Message) { func stepLeader(r *raft, m pb.Message) {
switch m.Type { switch m.Type {
case msgBeat: case msgBeat:
r.bcastHeartbeat() r.bcastHeartbeat()
@ -374,7 +376,7 @@ func stepLeader(r *raft, m Message) {
panic("unexpected length(entries) of a msgProp") panic("unexpected length(entries) of a msgProp")
} }
e := m.Entries[0] e := m.Entries[0]
if e.isConfig() { if isConfig(e) {
if r.configuring { if r.configuring {
panic("pending conf") panic("pending conf")
} }
@ -393,11 +395,11 @@ func stepLeader(r *raft, m Message) {
} }
} }
case msgVote: case msgVote:
r.send(Message{To: m.From, Type: msgVoteResp, Index: -1}) r.send(pb.Message{To: m.From, Type: msgVoteResp, Index: -1})
} }
} }
func stepCandidate(r *raft, m Message) { func stepCandidate(r *raft, m pb.Message) {
switch m.Type { switch m.Type {
case msgProp: case msgProp:
panic("no leader") panic("no leader")
@ -408,7 +410,7 @@ func stepCandidate(r *raft, m Message) {
r.becomeFollower(m.Term, m.From) r.becomeFollower(m.Term, m.From)
r.handleSnapshot(m) r.handleSnapshot(m)
case msgVote: case msgVote:
r.send(Message{To: m.From, Type: msgVoteResp, Index: -1}) r.send(pb.Message{To: m.From, Type: msgVoteResp, Index: -1})
case msgVoteResp: case msgVoteResp:
gr := r.poll(m.From, m.Index >= 0) gr := r.poll(m.From, m.Index >= 0)
switch r.q() { switch r.q() {
@ -421,7 +423,7 @@ func stepCandidate(r *raft, m Message) {
} }
} }
func stepFollower(r *raft, m Message) { func stepFollower(r *raft, m pb.Message) {
switch m.Type { switch m.Type {
case msgProp: case msgProp:
if r.lead == none { if r.lead == none {
@ -437,9 +439,9 @@ func stepFollower(r *raft, m Message) {
case msgVote: case msgVote:
if (r.Vote == none || r.Vote == m.From) && r.raftLog.isUpToDate(m.Index, m.LogTerm) { if (r.Vote == none || r.Vote == m.From) && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
r.Vote = m.From r.Vote = m.From
r.send(Message{To: m.From, Type: msgVoteResp, Index: r.raftLog.lastIndex()}) r.send(pb.Message{To: m.From, Type: msgVoteResp, Index: r.raftLog.lastIndex()})
} else { } else {
r.send(Message{To: m.From, Type: msgVoteResp, Index: -1}) r.send(pb.Message{To: m.From, Type: msgVoteResp, Index: -1})
} }
} }
} }
@ -451,7 +453,7 @@ func (r *raft) compact(d []byte) {
// restore recovers the statemachine from a snapshot. It restores the log and the // restore recovers the statemachine from a snapshot. It restores the log and the
// configuration of statemachine. // configuration of statemachine.
func (r *raft) restore(s Snapshot) bool { func (r *raft) restore(s pb.Snapshot) bool {
if s.Index <= r.raftLog.committed { if s.Index <= r.raftLog.committed {
return false return false
} }
@ -496,7 +498,7 @@ func (r *raft) delProgress(id int64) {
delete(r.prs, id) delete(r.prs, id)
} }
func (r *raft) loadEnts(ents []Entry) { func (r *raft) loadEnts(ents []pb.Entry) {
if !r.raftLog.isEmpty() { if !r.raftLog.isEmpty() {
panic("cannot load entries when log is not empty") panic("cannot load entries when log is not empty")
} }
@ -504,7 +506,7 @@ func (r *raft) loadEnts(ents []Entry) {
r.raftLog.unstable = r.raftLog.lastIndex() + 1 r.raftLog.unstable = r.raftLog.lastIndex() + 1
} }
func (r *raft) loadState(state State) { func (r *raft) loadState(state pb.State) {
r.raftLog.committed = state.Commit r.raftLog.committed = state.Commit
r.Term = state.Term r.Term = state.Term
r.Vote = state.Vote r.Vote = state.Vote

View File

@ -6,18 +6,20 @@ import (
"reflect" "reflect"
"sort" "sort"
"testing" "testing"
pb "github.com/coreos/etcd/raft/raftpb"
) )
// nextEnts returns the appliable entries and updates the applied index // nextEnts returns the appliable entries and updates the applied index
func (r *raft) nextEnts() (ents []Entry) { func (r *raft) nextEnts() (ents []pb.Entry) {
ents = r.raftLog.nextEnts() ents = r.raftLog.nextEnts()
r.raftLog.resetNextEnts() r.raftLog.resetNextEnts()
return ents return ents
} }
type Interface interface { type Interface interface {
Step(m Message) error Step(m pb.Message) error
ReadMessages() []Message ReadMessages() []pb.Message
} }
func TestLeaderElection(t *testing.T) { func TestLeaderElection(t *testing.T) {
@ -39,7 +41,7 @@ func TestLeaderElection(t *testing.T) {
} }
for i, tt := range tests { for i, tt := range tests {
tt.send(Message{From: 0, To: 0, Type: msgHup}) tt.send(pb.Message{From: 0, To: 0, Type: msgHup})
sm := tt.network.peers[0].(*raft) sm := tt.network.peers[0].(*raft)
if sm.state != tt.state { if sm.state != tt.state {
t.Errorf("#%d: state = %s, want %s", i, sm.state, tt.state) t.Errorf("#%d: state = %s, want %s", i, sm.state, tt.state)
@ -53,30 +55,30 @@ func TestLeaderElection(t *testing.T) {
func TestLogReplication(t *testing.T) { func TestLogReplication(t *testing.T) {
tests := []struct { tests := []struct {
*network *network
msgs []Message msgs []pb.Message
wcommitted int64 wcommitted int64
}{ }{
{ {
newNetwork(nil, nil, nil), newNetwork(nil, nil, nil),
[]Message{ []pb.Message{
{From: 0, To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("somedata")}}}, {From: 0, To: 0, Type: msgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}},
}, },
2, 2,
}, },
{ {
newNetwork(nil, nil, nil), newNetwork(nil, nil, nil),
[]Message{ []pb.Message{
{From: 0, To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("somedata")}}}, {From: 0, To: 0, Type: msgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}},
{From: 0, To: 1, Type: msgHup}, {From: 0, To: 1, Type: msgHup},
{From: 0, To: 1, Type: msgProp, Entries: []Entry{{Data: []byte("somedata")}}}, {From: 0, To: 1, Type: msgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}},
}, },
4, 4,
}, },
} }
for i, tt := range tests { for i, tt := range tests {
tt.send(Message{From: 0, To: 0, Type: msgHup}) tt.send(pb.Message{From: 0, To: 0, Type: msgHup})
for _, m := range tt.msgs { for _, m := range tt.msgs {
tt.send(m) tt.send(m)
@ -89,13 +91,13 @@ func TestLogReplication(t *testing.T) {
t.Errorf("#%d.%d: committed = %d, want %d", i, j, sm.raftLog.committed, tt.wcommitted) t.Errorf("#%d.%d: committed = %d, want %d", i, j, sm.raftLog.committed, tt.wcommitted)
} }
ents := make([]Entry, 0) ents := make([]pb.Entry, 0)
for _, e := range sm.nextEnts() { for _, e := range sm.nextEnts() {
if e.Data != nil { if e.Data != nil {
ents = append(ents, e) ents = append(ents, e)
} }
} }
props := make([]Message, 0) props := make([]pb.Message, 0)
for _, m := range tt.msgs { for _, m := range tt.msgs {
if m.Type == msgProp { if m.Type == msgProp {
props = append(props, m) props = append(props, m)
@ -112,9 +114,9 @@ func TestLogReplication(t *testing.T) {
func TestSingleNodeCommit(t *testing.T) { func TestSingleNodeCommit(t *testing.T) {
tt := newNetwork(nil) tt := newNetwork(nil)
tt.send(Message{From: 0, To: 0, Type: msgHup}) tt.send(pb.Message{From: 0, To: 0, Type: msgHup})
tt.send(Message{From: 0, To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}}) tt.send(pb.Message{From: 0, To: 0, Type: msgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
tt.send(Message{From: 0, To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}}) tt.send(pb.Message{From: 0, To: 0, Type: msgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
sm := tt.peers[0].(*raft) sm := tt.peers[0].(*raft)
if sm.raftLog.committed != 3 { if sm.raftLog.committed != 3 {
@ -127,15 +129,15 @@ func TestSingleNodeCommit(t *testing.T) {
// filtered. // filtered.
func TestCannotCommitWithoutNewTermEntry(t *testing.T) { func TestCannotCommitWithoutNewTermEntry(t *testing.T) {
tt := newNetwork(nil, nil, nil, nil, nil) tt := newNetwork(nil, nil, nil, nil, nil)
tt.send(Message{From: 0, To: 0, Type: msgHup}) tt.send(pb.Message{From: 0, To: 0, Type: msgHup})
// 0 cannot reach 2,3,4 // 0 cannot reach 2,3,4
tt.cut(0, 2) tt.cut(0, 2)
tt.cut(0, 3) tt.cut(0, 3)
tt.cut(0, 4) tt.cut(0, 4)
tt.send(Message{From: 0, To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}}) tt.send(pb.Message{From: 0, To: 0, Type: msgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
tt.send(Message{From: 0, To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}}) tt.send(pb.Message{From: 0, To: 0, Type: msgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
sm := tt.peers[0].(*raft) sm := tt.peers[0].(*raft)
if sm.raftLog.committed != 1 { if sm.raftLog.committed != 1 {
@ -148,7 +150,7 @@ func TestCannotCommitWithoutNewTermEntry(t *testing.T) {
tt.ignore(msgApp) tt.ignore(msgApp)
// elect 1 as the new leader with term 2 // elect 1 as the new leader with term 2
tt.send(Message{From: 1, To: 1, Type: msgHup}) tt.send(pb.Message{From: 1, To: 1, Type: msgHup})
// no log entries from previous term should be committed // no log entries from previous term should be committed
sm = tt.peers[1].(*raft) sm = tt.peers[1].(*raft)
@ -161,14 +163,14 @@ func TestCannotCommitWithoutNewTermEntry(t *testing.T) {
// send out a heartbeat // send out a heartbeat
// after append a ChangeTerm entry from the current term, all entries // after append a ChangeTerm entry from the current term, all entries
// should be committed // should be committed
tt.send(Message{From: 1, To: 1, Type: msgBeat}) tt.send(pb.Message{From: 1, To: 1, Type: msgBeat})
if sm.raftLog.committed != 4 { if sm.raftLog.committed != 4 {
t.Errorf("committed = %d, want %d", sm.raftLog.committed, 4) t.Errorf("committed = %d, want %d", sm.raftLog.committed, 4)
} }
// still be able to append a entry // still be able to append a entry
tt.send(Message{From: 1, To: 1, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}}) tt.send(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
if sm.raftLog.committed != 5 { if sm.raftLog.committed != 5 {
t.Errorf("committed = %d, want %d", sm.raftLog.committed, 5) t.Errorf("committed = %d, want %d", sm.raftLog.committed, 5)
@ -179,15 +181,15 @@ func TestCannotCommitWithoutNewTermEntry(t *testing.T) {
// when leader changes, no new proposal comes in. // when leader changes, no new proposal comes in.
func TestCommitWithoutNewTermEntry(t *testing.T) { func TestCommitWithoutNewTermEntry(t *testing.T) {
tt := newNetwork(nil, nil, nil, nil, nil) tt := newNetwork(nil, nil, nil, nil, nil)
tt.send(Message{From: 0, To: 0, Type: msgHup}) tt.send(pb.Message{From: 0, To: 0, Type: msgHup})
// 0 cannot reach 2,3,4 // 0 cannot reach 2,3,4
tt.cut(0, 2) tt.cut(0, 2)
tt.cut(0, 3) tt.cut(0, 3)
tt.cut(0, 4) tt.cut(0, 4)
tt.send(Message{From: 0, To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}}) tt.send(pb.Message{From: 0, To: 0, Type: msgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
tt.send(Message{From: 0, To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}}) tt.send(pb.Message{From: 0, To: 0, Type: msgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
sm := tt.peers[0].(*raft) sm := tt.peers[0].(*raft)
if sm.raftLog.committed != 1 { if sm.raftLog.committed != 1 {
@ -200,7 +202,7 @@ func TestCommitWithoutNewTermEntry(t *testing.T) {
// elect 1 as the new leader with term 2 // elect 1 as the new leader with term 2
// after append a ChangeTerm entry from the current term, all entries // after append a ChangeTerm entry from the current term, all entries
// should be committed // should be committed
tt.send(Message{From: 1, To: 1, Type: msgHup}) tt.send(pb.Message{From: 1, To: 1, Type: msgHup})
if sm.raftLog.committed != 4 { if sm.raftLog.committed != 4 {
t.Errorf("committed = %d, want %d", sm.raftLog.committed, 4) t.Errorf("committed = %d, want %d", sm.raftLog.committed, 4)
@ -215,13 +217,13 @@ func TestDuelingCandidates(t *testing.T) {
nt := newNetwork(a, b, c) nt := newNetwork(a, b, c)
nt.cut(0, 2) nt.cut(0, 2)
nt.send(Message{From: 0, To: 0, Type: msgHup}) nt.send(pb.Message{From: 0, To: 0, Type: msgHup})
nt.send(Message{From: 2, To: 2, Type: msgHup}) nt.send(pb.Message{From: 2, To: 2, Type: msgHup})
nt.recover() nt.recover()
nt.send(Message{From: 2, To: 2, Type: msgHup}) nt.send(pb.Message{From: 2, To: 2, Type: msgHup})
wlog := &raftLog{ents: []Entry{{}, Entry{Type: Normal, Data: nil, Term: 1, Index: 1}}, committed: 1} wlog := &raftLog{ents: []pb.Entry{{}, pb.Entry{Type: Normal, Data: nil, Term: 1, Index: 1}}, committed: 1}
tests := []struct { tests := []struct {
sm *raft sm *raft
state stateType state stateType
@ -256,15 +258,15 @@ func TestCandidateConcede(t *testing.T) {
tt := newNetwork(nil, nil, nil) tt := newNetwork(nil, nil, nil)
tt.isolate(0) tt.isolate(0)
tt.send(Message{From: 0, To: 0, Type: msgHup}) tt.send(pb.Message{From: 0, To: 0, Type: msgHup})
tt.send(Message{From: 2, To: 2, Type: msgHup}) tt.send(pb.Message{From: 2, To: 2, Type: msgHup})
// heal the partition // heal the partition
tt.recover() tt.recover()
data := []byte("force follower") data := []byte("force follower")
// send a proposal to 2 to flush out a msgApp to 0 // send a proposal to 2 to flush out a msgApp to 0
tt.send(Message{From: 2, To: 2, Type: msgProp, Entries: []Entry{{Data: data}}}) tt.send(pb.Message{From: 2, To: 2, Type: msgProp, Entries: []pb.Entry{{Data: data}}})
a := tt.peers[0].(*raft) a := tt.peers[0].(*raft)
if g := a.state; g != stateFollower { if g := a.state; g != stateFollower {
@ -273,7 +275,7 @@ func TestCandidateConcede(t *testing.T) {
if g := a.Term; g != 1 { if g := a.Term; g != 1 {
t.Errorf("term = %d, want %d", g, 1) t.Errorf("term = %d, want %d", g, 1)
} }
wantLog := ltoa(&raftLog{ents: []Entry{{}, {Type: Normal, Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}}, committed: 2}) wantLog := ltoa(&raftLog{ents: []pb.Entry{{}, {Type: Normal, Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}}, committed: 2})
for i, p := range tt.peers { for i, p := range tt.peers {
if sm, ok := p.(*raft); ok { if sm, ok := p.(*raft); ok {
l := ltoa(sm.raftLog) l := ltoa(sm.raftLog)
@ -288,7 +290,7 @@ func TestCandidateConcede(t *testing.T) {
func TestSingleNodeCandidate(t *testing.T) { func TestSingleNodeCandidate(t *testing.T) {
tt := newNetwork(nil) tt := newNetwork(nil)
tt.send(Message{From: 0, To: 0, Type: msgHup}) tt.send(pb.Message{From: 0, To: 0, Type: msgHup})
sm := tt.peers[0].(*raft) sm := tt.peers[0].(*raft)
if sm.state != stateLeader { if sm.state != stateLeader {
@ -299,14 +301,14 @@ func TestSingleNodeCandidate(t *testing.T) {
func TestOldMessages(t *testing.T) { func TestOldMessages(t *testing.T) {
tt := newNetwork(nil, nil, nil) tt := newNetwork(nil, nil, nil)
// make 0 leader @ term 3 // make 0 leader @ term 3
tt.send(Message{From: 0, To: 0, Type: msgHup}) tt.send(pb.Message{From: 0, To: 0, Type: msgHup})
tt.send(Message{From: 1, To: 1, Type: msgHup}) tt.send(pb.Message{From: 1, To: 1, Type: msgHup})
tt.send(Message{From: 0, To: 0, Type: msgHup}) tt.send(pb.Message{From: 0, To: 0, Type: msgHup})
// pretend we're an old leader trying to make progress // pretend we're an old leader trying to make progress
tt.send(Message{From: 0, To: 0, Type: msgApp, Term: 1, Entries: []Entry{{Term: 1}}}) tt.send(pb.Message{From: 0, To: 0, Type: msgApp, Term: 1, Entries: []pb.Entry{{Term: 1}}})
l := &raftLog{ l := &raftLog{
ents: []Entry{ ents: []pb.Entry{
{}, {Type: Normal, Data: nil, Term: 1, Index: 1}, {}, {Type: Normal, Data: nil, Term: 1, Index: 1},
{Type: Normal, Data: nil, Term: 2, Index: 2}, {Type: Normal, Data: nil, Term: 3, Index: 3}, {Type: Normal, Data: nil, Term: 2, Index: 2}, {Type: Normal, Data: nil, Term: 3, Index: 3},
}, },
@ -340,7 +342,7 @@ func TestProposal(t *testing.T) {
} }
for i, tt := range tests { for i, tt := range tests {
send := func(m Message) { send := func(m pb.Message) {
defer func() { defer func() {
// only recover is we expect it to panic so // only recover is we expect it to panic so
// panics we don't expect go up. // panics we don't expect go up.
@ -357,12 +359,12 @@ func TestProposal(t *testing.T) {
data := []byte("somedata") data := []byte("somedata")
// promote 0 the leader // promote 0 the leader
send(Message{From: 0, To: 0, Type: msgHup}) send(pb.Message{From: 0, To: 0, Type: msgHup})
send(Message{From: 0, To: 0, Type: msgProp, Entries: []Entry{{Data: data}}}) send(pb.Message{From: 0, To: 0, Type: msgProp, Entries: []pb.Entry{{Data: data}}})
wantLog := newLog() wantLog := newLog()
if tt.success { if tt.success {
wantLog = &raftLog{ents: []Entry{{}, {Type: Normal, Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}}, committed: 2} wantLog = &raftLog{ents: []pb.Entry{{}, {Type: Normal, Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}}, committed: 2}
} }
base := ltoa(wantLog) base := ltoa(wantLog)
for i, p := range tt.peers { for i, p := range tt.peers {
@ -391,12 +393,12 @@ func TestProposalByProxy(t *testing.T) {
for i, tt := range tests { for i, tt := range tests {
// promote 0 the leader // promote 0 the leader
tt.send(Message{From: 0, To: 0, Type: msgHup}) tt.send(pb.Message{From: 0, To: 0, Type: msgHup})
// propose via follower // propose via follower
tt.send(Message{From: 1, To: 1, Type: msgProp, Entries: []Entry{{Data: []byte("somedata")}}}) tt.send(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
wantLog := &raftLog{ents: []Entry{{}, {Type: Normal, Data: nil, Term: 1, Index: 1}, {Term: 1, Data: data, Index: 2}}, committed: 2} wantLog := &raftLog{ents: []pb.Entry{{}, {Type: Normal, Data: nil, Term: 1, Index: 1}, {Term: 1, Data: data, Index: 2}}, committed: 2}
base := ltoa(wantLog) base := ltoa(wantLog)
for i, p := range tt.peers { for i, p := range tt.peers {
if sm, ok := p.(*raft); ok { if sm, ok := p.(*raft); ok {
@ -418,29 +420,29 @@ func TestProposalByProxy(t *testing.T) {
func TestCommit(t *testing.T) { func TestCommit(t *testing.T) {
tests := []struct { tests := []struct {
matches []int64 matches []int64
logs []Entry logs []pb.Entry
smTerm int64 smTerm int64
w int64 w int64
}{ }{
// single // single
{[]int64{1}, []Entry{{}, {Term: 1}}, 1, 1}, {[]int64{1}, []pb.Entry{{}, {Term: 1}}, 1, 1},
{[]int64{1}, []Entry{{}, {Term: 1}}, 2, 0}, {[]int64{1}, []pb.Entry{{}, {Term: 1}}, 2, 0},
{[]int64{2}, []Entry{{}, {Term: 1}, {Term: 2}}, 2, 2}, {[]int64{2}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 2, 2},
{[]int64{1}, []Entry{{}, {Term: 2}}, 2, 1}, {[]int64{1}, []pb.Entry{{}, {Term: 2}}, 2, 1},
// odd // odd
{[]int64{2, 1, 1}, []Entry{{}, {Term: 1}, {Term: 2}}, 1, 1}, {[]int64{2, 1, 1}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
{[]int64{2, 1, 1}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, 0}, {[]int64{2, 1, 1}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
{[]int64{2, 1, 2}, []Entry{{}, {Term: 1}, {Term: 2}}, 2, 2}, {[]int64{2, 1, 2}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 2, 2},
{[]int64{2, 1, 2}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, 0}, {[]int64{2, 1, 2}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
// even // even
{[]int64{2, 1, 1, 1}, []Entry{{}, {Term: 1}, {Term: 2}}, 1, 1}, {[]int64{2, 1, 1, 1}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
{[]int64{2, 1, 1, 1}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, 0}, {[]int64{2, 1, 1, 1}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
{[]int64{2, 1, 1, 2}, []Entry{{}, {Term: 1}, {Term: 2}}, 1, 1}, {[]int64{2, 1, 1, 2}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
{[]int64{2, 1, 1, 2}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, 0}, {[]int64{2, 1, 1, 2}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
{[]int64{2, 1, 2, 2}, []Entry{{}, {Term: 1}, {Term: 2}}, 2, 2}, {[]int64{2, 1, 2, 2}, []pb.Entry{{}, {Term: 1}, {Term: 2}}, 2, 2},
{[]int64{2, 1, 2, 2}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, 0}, {[]int64{2, 1, 2, 2}, []pb.Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
} }
for i, tt := range tests { for i, tt := range tests {
@ -448,7 +450,7 @@ func TestCommit(t *testing.T) {
for j := 0; j < len(tt.matches); j++ { for j := 0; j < len(tt.matches); j++ {
prs[int64(j)] = &progress{tt.matches[j], tt.matches[j] + 1} prs[int64(j)] = &progress{tt.matches[j], tt.matches[j] + 1}
} }
sm := &raft{raftLog: &raftLog{ents: tt.logs}, prs: prs, State: State{Term: tt.smTerm}} sm := &raft{raftLog: &raftLog{ents: tt.logs}, prs: prs, State: pb.State{Term: tt.smTerm}}
sm.maybeCommit() sm.maybeCommit()
if g := sm.raftLog.committed; g != tt.w { if g := sm.raftLog.committed; g != tt.w {
t.Errorf("#%d: committed = %d, want %d", i, g, tt.w) t.Errorf("#%d: committed = %d, want %d", i, g, tt.w)
@ -463,32 +465,32 @@ func TestCommit(t *testing.T) {
// 3. If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry). // 3. If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry).
func TestHandleMsgApp(t *testing.T) { func TestHandleMsgApp(t *testing.T) {
tests := []struct { tests := []struct {
m Message m pb.Message
wIndex int64 wIndex int64
wCommit int64 wCommit int64
wAccept bool wAccept bool
}{ }{
// Ensure 1 // Ensure 1
{Message{Type: msgApp, Term: 2, LogTerm: 3, Index: 2, Commit: 3}, 2, 0, false}, // previous log mismatch {pb.Message{Type: msgApp, Term: 2, LogTerm: 3, Index: 2, Commit: 3}, 2, 0, false}, // previous log mismatch
{Message{Type: msgApp, Term: 2, LogTerm: 3, Index: 3, Commit: 3}, 2, 0, false}, // previous log non-exist {pb.Message{Type: msgApp, Term: 2, LogTerm: 3, Index: 3, Commit: 3}, 2, 0, false}, // previous log non-exist
// Ensure 2 // Ensure 2
{Message{Type: msgApp, Term: 2, LogTerm: 1, Index: 1, Commit: 1}, 2, 1, true}, {pb.Message{Type: msgApp, Term: 2, LogTerm: 1, Index: 1, Commit: 1}, 2, 1, true},
{Message{Type: msgApp, Term: 2, LogTerm: 0, Index: 0, Commit: 1, Entries: []Entry{{Term: 2}}}, 1, 1, true}, {pb.Message{Type: msgApp, Term: 2, LogTerm: 0, Index: 0, Commit: 1, Entries: []pb.Entry{{Term: 2}}}, 1, 1, true},
{Message{Type: msgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 3, Entries: []Entry{{Term: 2}, {Term: 2}}}, 4, 3, true}, {pb.Message{Type: msgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 3, Entries: []pb.Entry{{Term: 2}, {Term: 2}}}, 4, 3, true},
{Message{Type: msgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 4, Entries: []Entry{{Term: 2}}}, 3, 3, true}, {pb.Message{Type: msgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 4, Entries: []pb.Entry{{Term: 2}}}, 3, 3, true},
{Message{Type: msgApp, Term: 2, LogTerm: 1, Index: 1, Commit: 4, Entries: []Entry{{Term: 2}}}, 2, 2, true}, {pb.Message{Type: msgApp, Term: 2, LogTerm: 1, Index: 1, Commit: 4, Entries: []pb.Entry{{Term: 2}}}, 2, 2, true},
// Ensure 3 // Ensure 3
{Message{Type: msgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 2}, 2, 2, true}, {pb.Message{Type: msgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 2}, 2, 2, true},
{Message{Type: msgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 4}, 2, 2, true}, // commit upto min(commit, last) {pb.Message{Type: msgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 4}, 2, 2, true}, // commit upto min(commit, last)
} }
for i, tt := range tests { for i, tt := range tests {
sm := &raft{ sm := &raft{
state: stateFollower, state: stateFollower,
State: State{Term: 2}, State: pb.State{Term: 2},
raftLog: &raftLog{committed: 0, ents: []Entry{{}, {Term: 1}, {Term: 2}}}, raftLog: &raftLog{committed: 0, ents: []pb.Entry{{}, {Term: 1}, {Term: 2}}},
} }
sm.handleAppendEntries(tt.m) sm.handleAppendEntries(tt.m)
@ -548,12 +550,12 @@ func TestRecvMsgVote(t *testing.T) {
for i, tt := range tests { for i, tt := range tests {
sm := &raft{ sm := &raft{
state: tt.state, state: tt.state,
State: State{Vote: tt.voteFor}, State: pb.State{Vote: tt.voteFor},
raftLog: &raftLog{ents: []Entry{{}, {Term: 2}, {Term: 2}}}, raftLog: &raftLog{ents: []pb.Entry{{}, {Term: 2}, {Term: 2}}},
} }
sm.Step(Message{Type: msgVote, From: 1, Index: tt.i, LogTerm: tt.term}) sm.Step(pb.Message{Type: msgVote, From: 1, Index: tt.i, LogTerm: tt.term})
msgs := sm.ReadMessages() msgs := sm.ReadMessages()
if g := len(msgs); g != 1 { if g := len(msgs); g != 1 {
@ -624,7 +626,7 @@ func TestConf(t *testing.T) {
sm.becomeCandidate() sm.becomeCandidate()
sm.becomeLeader() sm.becomeLeader()
sm.Step(Message{From: 0, To: 0, Type: msgProp, Entries: []Entry{{Type: AddNode}}}) sm.Step(pb.Message{From: 0, To: 0, Type: msgProp, Entries: []pb.Entry{{Type: AddNode}}})
if sm.raftLog.lastIndex() != 2 { if sm.raftLog.lastIndex() != 2 {
t.Errorf("lastindex = %d, want %d", sm.raftLog.lastIndex(), 1) t.Errorf("lastindex = %d, want %d", sm.raftLog.lastIndex(), 1)
} }
@ -638,7 +640,7 @@ func TestConf(t *testing.T) {
// deny the second configuration change request if there is a pending one // deny the second configuration change request if there is a pending one
paniced := false paniced := false
defer func() { recover(); paniced = true }() defer func() { recover(); paniced = true }()
sm.Step(Message{From: 0, To: 0, Type: msgProp, Entries: []Entry{{Type: AddNode}}}) sm.Step(pb.Message{From: 0, To: 0, Type: msgProp, Entries: []pb.Entry{{Type: AddNode}}})
if !paniced { if !paniced {
t.Errorf("expected panic") t.Errorf("expected panic")
} }
@ -661,7 +663,7 @@ func TestConfChangeLeader(t *testing.T) {
for i, tt := range tests { for i, tt := range tests {
sm := newRaft(0, []int64{0}) sm := newRaft(0, []int64{0})
sm.raftLog = &raftLog{ents: []Entry{{}, {Type: tt.et}}} sm.raftLog = &raftLog{ents: []pb.Entry{{}, {Type: tt.et}}}
sm.becomeCandidate() sm.becomeCandidate()
sm.becomeLeader() sm.becomeLeader()
@ -701,7 +703,7 @@ func TestAllServerStepdown(t *testing.T) {
} }
for j, msgType := range tmsgTypes { for j, msgType := range tmsgTypes {
sm.Step(Message{From: 1, Type: msgType, Term: tterm, LogTerm: tterm}) sm.Step(pb.Message{From: 1, Type: msgType, Term: tterm, LogTerm: tterm})
if sm.state != tt.wstate { if sm.state != tt.wstate {
t.Errorf("#%d.%d state = %v , want %v", i, j, sm.state, tt.wstate) t.Errorf("#%d.%d state = %v , want %v", i, j, sm.state, tt.wstate)
@ -738,11 +740,11 @@ func TestLeaderAppResp(t *testing.T) {
// sm term is 1 after it becomes the leader. // sm term is 1 after it becomes the leader.
// thus the last log term must be 1 to be committed. // thus the last log term must be 1 to be committed.
sm := newRaft(0, []int64{0, 1, 2}) sm := newRaft(0, []int64{0, 1, 2})
sm.raftLog = &raftLog{ents: []Entry{{}, {Term: 0}, {Term: 1}}} sm.raftLog = &raftLog{ents: []pb.Entry{{}, {Term: 0}, {Term: 1}}}
sm.becomeCandidate() sm.becomeCandidate()
sm.becomeLeader() sm.becomeLeader()
sm.ReadMessages() sm.ReadMessages()
sm.Step(Message{From: 1, Type: msgAppResp, Index: tt.index, Term: sm.Term}) sm.Step(pb.Message{From: 1, Type: msgAppResp, Index: tt.index, Term: sm.Term})
msgs := sm.ReadMessages() msgs := sm.ReadMessages()
if len(msgs) != tt.wmsgNum { if len(msgs) != tt.wmsgNum {
@ -773,10 +775,10 @@ func TestRecvMsgBeat(t *testing.T) {
for i, tt := range tests { for i, tt := range tests {
sm := newRaft(0, []int64{0, 1, 2}) sm := newRaft(0, []int64{0, 1, 2})
sm.raftLog = &raftLog{ents: []Entry{{}, {Term: 0}, {Term: 1}}} sm.raftLog = &raftLog{ents: []pb.Entry{{}, {Term: 0}, {Term: 1}}}
sm.Term = 1 sm.Term = 1
sm.state = tt.state sm.state = tt.state
sm.Step(Message{From: 0, To: 0, Type: msgBeat}) sm.Step(pb.Message{From: 0, To: 0, Type: msgBeat})
msgs := sm.ReadMessages() msgs := sm.ReadMessages()
if len(msgs) != tt.wMsg { if len(msgs) != tt.wMsg {
@ -791,7 +793,7 @@ func TestRecvMsgBeat(t *testing.T) {
} }
func TestRestore(t *testing.T) { func TestRestore(t *testing.T) {
s := Snapshot{ s := pb.Snapshot{
Index: defaultCompactThreshold + 1, Index: defaultCompactThreshold + 1,
Term: defaultCompactThreshold + 1, Term: defaultCompactThreshold + 1,
Nodes: []int64{0, 1, 2}, Nodes: []int64{0, 1, 2},
@ -825,7 +827,7 @@ func TestRestore(t *testing.T) {
} }
func TestProvideSnap(t *testing.T) { func TestProvideSnap(t *testing.T) {
s := Snapshot{ s := pb.Snapshot{
Index: defaultCompactThreshold + 1, Index: defaultCompactThreshold + 1,
Term: defaultCompactThreshold + 1, Term: defaultCompactThreshold + 1,
Nodes: []int64{0, 1}, Nodes: []int64{0, 1},
@ -838,7 +840,7 @@ func TestProvideSnap(t *testing.T) {
sm.becomeCandidate() sm.becomeCandidate()
sm.becomeLeader() sm.becomeLeader()
sm.Step(Message{From: 0, To: 0, Type: msgBeat}) sm.Step(pb.Message{From: 0, To: 0, Type: msgBeat})
msgs := sm.ReadMessages() msgs := sm.ReadMessages()
if len(msgs) != 1 { if len(msgs) != 1 {
t.Errorf("len(msgs) = %d, want 1", len(msgs)) t.Errorf("len(msgs) = %d, want 1", len(msgs))
@ -852,7 +854,7 @@ func TestProvideSnap(t *testing.T) {
// node 1 needs a snapshot // node 1 needs a snapshot
sm.prs[1].next = sm.raftLog.offset sm.prs[1].next = sm.raftLog.offset
sm.Step(Message{From: 1, To: 0, Type: msgAppResp, Index: -1}) sm.Step(pb.Message{From: 1, To: 0, Type: msgAppResp, Index: -1})
msgs = sm.ReadMessages() msgs = sm.ReadMessages()
if len(msgs) != 1 { if len(msgs) != 1 {
t.Errorf("len(msgs) = %d, want 1", len(msgs)) t.Errorf("len(msgs) = %d, want 1", len(msgs))
@ -864,12 +866,12 @@ func TestProvideSnap(t *testing.T) {
} }
func TestRestoreFromSnapMsg(t *testing.T) { func TestRestoreFromSnapMsg(t *testing.T) {
s := Snapshot{ s := pb.Snapshot{
Index: defaultCompactThreshold + 1, Index: defaultCompactThreshold + 1,
Term: defaultCompactThreshold + 1, Term: defaultCompactThreshold + 1,
Nodes: []int64{0, 1}, Nodes: []int64{0, 1},
} }
m := Message{Type: msgSnap, From: 0, Term: 1, Snapshot: s} m := pb.Message{Type: msgSnap, From: 0, Term: 1, Snapshot: s}
sm := newRaft(1, []int64{0, 1}) sm := newRaft(1, []int64{0, 1})
sm.Step(m) sm.Step(m)
@ -881,18 +883,18 @@ func TestRestoreFromSnapMsg(t *testing.T) {
func TestSlowNodeRestore(t *testing.T) { func TestSlowNodeRestore(t *testing.T) {
nt := newNetwork(nil, nil, nil) nt := newNetwork(nil, nil, nil)
nt.send(Message{From: 0, To: 0, Type: msgHup}) nt.send(pb.Message{From: 0, To: 0, Type: msgHup})
nt.isolate(2) nt.isolate(2)
for j := 0; j < defaultCompactThreshold+1; j++ { for j := 0; j < defaultCompactThreshold+1; j++ {
nt.send(Message{From: 0, To: 0, Type: msgProp, Entries: []Entry{{}}}) nt.send(pb.Message{From: 0, To: 0, Type: msgProp, Entries: []pb.Entry{{}}})
} }
lead := nt.peers[0].(*raft) lead := nt.peers[0].(*raft)
lead.nextEnts() lead.nextEnts()
lead.compact(nil) lead.compact(nil)
nt.recover() nt.recover()
nt.send(Message{From: 0, To: 0, Type: msgBeat}) nt.send(pb.Message{From: 0, To: 0, Type: msgBeat})
follower := nt.peers[2].(*raft) follower := nt.peers[2].(*raft)
if !reflect.DeepEqual(follower.raftLog.snapshot, lead.raftLog.snapshot) { if !reflect.DeepEqual(follower.raftLog.snapshot, lead.raftLog.snapshot) {
@ -900,16 +902,16 @@ func TestSlowNodeRestore(t *testing.T) {
} }
committed := follower.raftLog.lastIndex() committed := follower.raftLog.lastIndex()
nt.send(Message{From: 0, To: 0, Type: msgProp, Entries: []Entry{{}}}) nt.send(pb.Message{From: 0, To: 0, Type: msgProp, Entries: []pb.Entry{{}}})
if follower.raftLog.committed != committed+1 { if follower.raftLog.committed != committed+1 {
t.Errorf("follower.comitted = %d, want %d", follower.raftLog.committed, committed+1) t.Errorf("follower.comitted = %d, want %d", follower.raftLog.committed, committed+1)
} }
} }
func ents(terms ...int64) *raft { func ents(terms ...int64) *raft {
ents := []Entry{{}} ents := []pb.Entry{{}}
for _, term := range terms { for _, term := range terms {
ents = append(ents, Entry{Term: term}) ents = append(ents, pb.Entry{Term: term})
} }
sm := &raft{raftLog: &raftLog{ents: ents}} sm := &raft{raftLog: &raftLog{ents: ents}}
@ -961,7 +963,7 @@ func newNetwork(peers ...Interface) *network {
} }
} }
func (nw *network) send(msgs ...Message) { func (nw *network) send(msgs ...pb.Message) {
for len(msgs) > 0 { for len(msgs) > 0 {
m := msgs[0] m := msgs[0]
p := nw.peers[m.To] p := nw.peers[m.To]
@ -998,8 +1000,8 @@ func (nw *network) recover() {
nw.ignorem = make(map[int64]bool) nw.ignorem = make(map[int64]bool)
} }
func (nw *network) filter(msgs []Message) []Message { func (nw *network) filter(msgs []pb.Message) []pb.Message {
mm := make([]Message, 0) mm := make([]pb.Message, 0)
for _, m := range msgs { for _, m := range msgs {
if nw.ignorem[m.Type] { if nw.ignorem[m.Type] {
continue continue
@ -1025,7 +1027,7 @@ type connem struct {
type blackHole struct{} type blackHole struct{}
func (blackHole) Step(Message) error { return nil } func (blackHole) Step(pb.Message) error { return nil }
func (blackHole) ReadMessages() []Message { return nil } func (blackHole) ReadMessages() []pb.Message { return nil }
var nopStepper = &blackHole{} var nopStepper = &blackHole{}

View File

@ -1,26 +1,26 @@
// Code generated by protoc-gen-gogo. // Code generated by protoc-gen-gogo.
// source: protos.proto // source: raft.proto
// DO NOT EDIT! // DO NOT EDIT!
/* /*
Package raft is a generated protocol buffer package. Package raftis a generated protocol buffer package.
It is generated from these files: It is generated from these files:
protos.proto raft.proto
state.proto
It has these top-level messages: It has these top-level messages:
Entry Entry
Snapshot Snapshot
Message Message
State
*/ */
package raft package raftpb
import proto "code.google.com/p/gogoprotobuf/proto" import proto "code.google.com/p/gogoprotobuf/proto"
import json "encoding/json" import json "encoding/json"
import math "math" import math "math"
// discarding unused import gogoproto "code.google.com/p/gogoprotobuf/gogoproto/gogo.pb" // discarding unused import gogoproto "code.google.com/p/gogoprotobuf/gogoproto/gogo.
import io "io" import io "io"
import code_google_com_p_gogoprotobuf_proto "code.google.com/p/gogoprotobuf/proto" import code_google_com_p_gogoprotobuf_proto "code.google.com/p/gogoprotobuf/proto"
@ -56,22 +56,34 @@ func (m *Snapshot) String() string { return proto.CompactTextString(m) }
func (*Snapshot) ProtoMessage() {} func (*Snapshot) ProtoMessage() {}
type Message struct { type Message struct {
Type int64 `protobuf:"varint,1,req,name=type" json:"type"` Type int64 `protobuf:"varint,1,req,name=type" json:"type"`
To int64 `protobuf:"varint,2,req,name=to" json:"to"` To int64 `protobuf:"varint,2,req,name=to" json:"to"`
From int64 `protobuf:"varint,3,req,name=from" json:"from"` From int64 `protobuf:"varint,3,req,name=from" json:"from"`
Term int64 `protobuf:"varint,4,req,name=term" json:"term"` Term int64 `protobuf:"varint,4,req,name=term" json:"term"`
LogTerm int64 `protobuf:"varint,5,req,name=logTerm" json:"logTerm"` LogTerm int64 `protobuf:"varint,5,req,name=logTerm" json:"logTerm"`
Index int64 `protobuf:"varint,6,req,name=index" json:"index"` Index int64 `protobuf:"varint,6,req,name=index" json:"index"`
Entries []Entry `protobuf:"bytes,7,rep,name=entries" json:"entries"` Entries []Entry `protobuf:"bytes,7,rep,name=entries" json:"entries"`
Commit int64 `protobuf:"varint,8,req,name=commit" json:"commit"` Commit int64 `protobuf:"varint,8,req,name=commit" json:"commit"`
Snapshot Snapshot `protobuf:"bytes,9,req,name=snapshot" json:"snapshot"` Snapshot Snapshot `protobuf:"bytes,9,req,name=snapshot" json:"snapshot"`
XXX_unrecognized []byte `json:"-"` XXX_unrecognized []byte `json:"-"`
} }
func (m *Message) Reset() { *m = Message{} } func (m *Message) Reset() { *m = Message{} }
func (m *Message) String() string { return proto.CompactTextString(m) } func (m *Message) String() string { return proto.CompactTextString(m) }
func (*Message) ProtoMessage() {} func (*Message) ProtoMessage() {}
type State struct {
Term int64 `protobuf:"varint,1,req,name=term" json:"term"`
Vote int64 `protobuf:"varint,2,req,name=vote" json:"vote"`
Commit int64 `protobuf:"varint,3,req,name=commit" json:"commit"`
LastIndex int64 `protobuf:"varint,4,req,name=lastIndex" json:"lastIndex"`
XXX_unrecognized []byte `json:"-"`
}
func (m *State) Reset() { *m = State{} }
func (m *State) String() string { return proto.CompactTextString(m) }
func (*State) ProtoMessage() {}
func init() { func init() {
} }
func (m *Entry) Unmarshal(data []byte) error { func (m *Entry) Unmarshal(data []byte) error {
@ -503,15 +515,117 @@ func (m *Message) Unmarshal(data []byte) error {
} }
return nil return nil
} }
func (m *State) Unmarshal(data []byte) error {
l := len(data)
index := 0
for index < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
if index >= l {
return io.ErrUnexpectedEOF
}
b := data[index]
index++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
switch fieldNum {
case 1:
if wireType != 0 {
return code_google_com_p_gogoprotobuf_proto.ErrWrongType
}
for shift := uint(0); ; shift += 7 {
if index >= l {
return io.ErrUnexpectedEOF
}
b := data[index]
index++
m.Term |= (int64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
case 2:
if wireType != 0 {
return code_google_com_p_gogoprotobuf_proto.ErrWrongType
}
for shift := uint(0); ; shift += 7 {
if index >= l {
return io.ErrUnexpectedEOF
}
b := data[index]
index++
m.Vote |= (int64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
case 3:
if wireType != 0 {
return code_google_com_p_gogoprotobuf_proto.ErrWrongType
}
for shift := uint(0); ; shift += 7 {
if index >= l {
return io.ErrUnexpectedEOF
}
b := data[index]
index++
m.Commit |= (int64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
case 4:
if wireType != 0 {
return code_google_com_p_gogoprotobuf_proto.ErrWrongType
}
for shift := uint(0); ; shift += 7 {
if index >= l {
return io.ErrUnexpectedEOF
}
b := data[index]
index++
m.LastIndex |= (int64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
default:
var sizeOfWire int
for {
sizeOfWire++
wire >>= 7
if wire == 0 {
break
}
}
index -= sizeOfWire
skippy, err := code_google_com_p_gogoprotobuf_proto.Skip(data[index:])
if err != nil {
return err
}
if (index + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, data[index:index+skippy]...)
index += skippy
}
}
return nil
}
func (m *Entry) Size() (n int) { func (m *Entry) Size() (n int) {
var l int var l int
_ = l _ = l
n += 1 + sovProtos(uint64(m.Type)) n += 1 + sovRaft(uint64(m.Type))
n += 1 + sovProtos(uint64(m.Term)) n += 1 + sovRaft(uint64(m.Term))
n += 1 + sovProtos(uint64(m.Index)) n += 1 + sovRaft(uint64(m.Index))
l = len(m.Data) l = len(m.Data)
n += 1 + l + sovProtos(uint64(l)) n += 1 + l + sovRaft(uint64(l))
n += 1 + sovProtos(uint64(m.Id)) n += 1 + sovRaft(uint64(m.Id))
if m.XXX_unrecognized != nil { if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized) n += len(m.XXX_unrecognized)
} }
@ -521,14 +635,14 @@ func (m *Snapshot) Size() (n int) {
var l int var l int
_ = l _ = l
l = len(m.Data) l = len(m.Data)
n += 1 + l + sovProtos(uint64(l)) n += 1 + l + sovRaft(uint64(l))
if len(m.Nodes) > 0 { if len(m.Nodes) > 0 {
for _, e := range m.Nodes { for _, e := range m.Nodes {
n += 1 + sovProtos(uint64(e)) n += 1 + sovRaft(uint64(e))
} }
} }
n += 1 + sovProtos(uint64(m.Index)) n += 1 + sovRaft(uint64(m.Index))
n += 1 + sovProtos(uint64(m.Term)) n += 1 + sovRaft(uint64(m.Term))
if m.XXX_unrecognized != nil { if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized) n += len(m.XXX_unrecognized)
} }
@ -537,28 +651,40 @@ func (m *Snapshot) Size() (n int) {
func (m *Message) Size() (n int) { func (m *Message) Size() (n int) {
var l int var l int
_ = l _ = l
n += 1 + sovProtos(uint64(m.Type)) n += 1 + sovRaft(uint64(m.Type))
n += 1 + sovProtos(uint64(m.To)) n += 1 + sovRaft(uint64(m.To))
n += 1 + sovProtos(uint64(m.From)) n += 1 + sovRaft(uint64(m.From))
n += 1 + sovProtos(uint64(m.Term)) n += 1 + sovRaft(uint64(m.Term))
n += 1 + sovProtos(uint64(m.LogTerm)) n += 1 + sovRaft(uint64(m.LogTerm))
n += 1 + sovProtos(uint64(m.Index)) n += 1 + sovRaft(uint64(m.Index))
if len(m.Entries) > 0 { if len(m.Entries) > 0 {
for _, e := range m.Entries { for _, e := range m.Entries {
l = e.Size() l = e.Size()
n += 1 + l + sovProtos(uint64(l)) n += 1 + l + sovRaft(uint64(l))
} }
} }
n += 1 + sovProtos(uint64(m.Commit)) n += 1 + sovRaft(uint64(m.Commit))
l = m.Snapshot.Size() l = m.Snapshot.Size()
n += 1 + l + sovProtos(uint64(l)) n += 1 + l + sovRaft(uint64(l))
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func (m *State) Size() (n int) {
var l int
_ = l
n += 1 + sovRaft(uint64(m.Term))
n += 1 + sovRaft(uint64(m.Vote))
n += 1 + sovRaft(uint64(m.Commit))
n += 1 + sovRaft(uint64(m.LastIndex))
if m.XXX_unrecognized != nil { if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized) n += len(m.XXX_unrecognized)
} }
return n return n
} }
func sovProtos(x uint64) (n int) { func sovRaft(x uint64) (n int) {
for { for {
n++ n++
x >>= 7 x >>= 7
@ -568,8 +694,8 @@ func sovProtos(x uint64) (n int) {
} }
return n return n
} }
func sozProtos(x uint64) (n int) { func sozRaft(x uint64) (n int) {
return sovProtos(uint64((x << 1) ^ uint64((int64(x) >> 63)))) return sovRaft(uint64((x << 1) ^ uint64((int64(x) >> 63))))
} }
func (m *Entry) Marshal() (data []byte, err error) { func (m *Entry) Marshal() (data []byte, err error) {
size := m.Size() size := m.Size()
@ -588,20 +714,20 @@ func (m *Entry) MarshalTo(data []byte) (n int, err error) {
_ = l _ = l
data[i] = 0x8 data[i] = 0x8
i++ i++
i = encodeVarintProtos(data, i, uint64(m.Type)) i = encodeVarintRaft(data, i, uint64(m.Type))
data[i] = 0x10 data[i] = 0x10
i++ i++
i = encodeVarintProtos(data, i, uint64(m.Term)) i = encodeVarintRaft(data, i, uint64(m.Term))
data[i] = 0x18 data[i] = 0x18
i++ i++
i = encodeVarintProtos(data, i, uint64(m.Index)) i = encodeVarintRaft(data, i, uint64(m.Index))
data[i] = 0x22 data[i] = 0x22
i++ i++
i = encodeVarintProtos(data, i, uint64(len(m.Data))) i = encodeVarintRaft(data, i, uint64(len(m.Data)))
i += copy(data[i:], m.Data) i += copy(data[i:], m.Data)
data[i] = 0x28 data[i] = 0x28
i++ i++
i = encodeVarintProtos(data, i, uint64(m.Id)) i = encodeVarintRaft(data, i, uint64(m.Id))
if m.XXX_unrecognized != nil { if m.XXX_unrecognized != nil {
i += copy(data[i:], m.XXX_unrecognized) i += copy(data[i:], m.XXX_unrecognized)
} }
@ -624,21 +750,21 @@ func (m *Snapshot) MarshalTo(data []byte) (n int, err error) {
_ = l _ = l
data[i] = 0xa data[i] = 0xa
i++ i++
i = encodeVarintProtos(data, i, uint64(len(m.Data))) i = encodeVarintRaft(data, i, uint64(len(m.Data)))
i += copy(data[i:], m.Data) i += copy(data[i:], m.Data)
if len(m.Nodes) > 0 { if len(m.Nodes) > 0 {
for _, num := range m.Nodes { for _, num := range m.Nodes {
data[i] = 0x10 data[i] = 0x10
i++ i++
i = encodeVarintProtos(data, i, uint64(num)) i = encodeVarintRaft(data, i, uint64(num))
} }
} }
data[i] = 0x18 data[i] = 0x18
i++ i++
i = encodeVarintProtos(data, i, uint64(m.Index)) i = encodeVarintRaft(data, i, uint64(m.Index))
data[i] = 0x20 data[i] = 0x20
i++ i++
i = encodeVarintProtos(data, i, uint64(m.Term)) i = encodeVarintRaft(data, i, uint64(m.Term))
if m.XXX_unrecognized != nil { if m.XXX_unrecognized != nil {
i += copy(data[i:], m.XXX_unrecognized) i += copy(data[i:], m.XXX_unrecognized)
} }
@ -661,27 +787,27 @@ func (m *Message) MarshalTo(data []byte) (n int, err error) {
_ = l _ = l
data[i] = 0x8 data[i] = 0x8
i++ i++
i = encodeVarintProtos(data, i, uint64(m.Type)) i = encodeVarintRaft(data, i, uint64(m.Type))
data[i] = 0x10 data[i] = 0x10
i++ i++
i = encodeVarintProtos(data, i, uint64(m.To)) i = encodeVarintRaft(data, i, uint64(m.To))
data[i] = 0x18 data[i] = 0x18
i++ i++
i = encodeVarintProtos(data, i, uint64(m.From)) i = encodeVarintRaft(data, i, uint64(m.From))
data[i] = 0x20 data[i] = 0x20
i++ i++
i = encodeVarintProtos(data, i, uint64(m.Term)) i = encodeVarintRaft(data, i, uint64(m.Term))
data[i] = 0x28 data[i] = 0x28
i++ i++
i = encodeVarintProtos(data, i, uint64(m.LogTerm)) i = encodeVarintRaft(data, i, uint64(m.LogTerm))
data[i] = 0x30 data[i] = 0x30
i++ i++
i = encodeVarintProtos(data, i, uint64(m.Index)) i = encodeVarintRaft(data, i, uint64(m.Index))
if len(m.Entries) > 0 { if len(m.Entries) > 0 {
for _, msg := range m.Entries { for _, msg := range m.Entries {
data[i] = 0x3a data[i] = 0x3a
i++ i++
i = encodeVarintProtos(data, i, uint64(msg.Size())) i = encodeVarintRaft(data, i, uint64(msg.Size()))
n, err := msg.MarshalTo(data[i:]) n, err := msg.MarshalTo(data[i:])
if err != nil { if err != nil {
return 0, err return 0, err
@ -691,10 +817,10 @@ func (m *Message) MarshalTo(data []byte) (n int, err error) {
} }
data[i] = 0x40 data[i] = 0x40
i++ i++
i = encodeVarintProtos(data, i, uint64(m.Commit)) i = encodeVarintRaft(data, i, uint64(m.Commit))
data[i] = 0x4a data[i] = 0x4a
i++ i++
i = encodeVarintProtos(data, i, uint64(m.Snapshot.Size())) i = encodeVarintRaft(data, i, uint64(m.Snapshot.Size()))
n1, err := m.Snapshot.MarshalTo(data[i:]) n1, err := m.Snapshot.MarshalTo(data[i:])
if err != nil { if err != nil {
return 0, err return 0, err
@ -705,7 +831,39 @@ func (m *Message) MarshalTo(data []byte) (n int, err error) {
} }
return i, nil return i, nil
} }
func encodeFixed64Protos(data []byte, offset int, v uint64) int { func (m *State) Marshal() (data []byte, err error) {
size := m.Size()
data = make([]byte, size)
n, err := m.MarshalTo(data)
if err != nil {
return nil, err
}
return data[:n], nil
}
func (m *State) MarshalTo(data []byte) (n int, err error) {
var i int
_ = i
var l int
_ = l
data[i] = 0x8
i++
i = encodeVarintRaft(data, i, uint64(m.Term))
data[i] = 0x10
i++
i = encodeVarintRaft(data, i, uint64(m.Vote))
data[i] = 0x18
i++
i = encodeVarintRaft(data, i, uint64(m.Commit))
data[i] = 0x20
i++
i = encodeVarintRaft(data, i, uint64(m.LastIndex))
if m.XXX_unrecognized != nil {
i += copy(data[i:], m.XXX_unrecognized)
}
return i, nil
}
func encodeFixed64Raft(data []byte, offset int, v uint64) int {
data[offset] = uint8(v) data[offset] = uint8(v)
data[offset+1] = uint8(v >> 8) data[offset+1] = uint8(v >> 8)
data[offset+2] = uint8(v >> 16) data[offset+2] = uint8(v >> 16)
@ -716,14 +874,14 @@ func encodeFixed64Protos(data []byte, offset int, v uint64) int {
data[offset+7] = uint8(v >> 56) data[offset+7] = uint8(v >> 56)
return offset + 8 return offset + 8
} }
func encodeFixed32Protos(data []byte, offset int, v uint32) int { func encodeFixed32Raft(data []byte, offset int, v uint32) int {
data[offset] = uint8(v) data[offset] = uint8(v)
data[offset+1] = uint8(v >> 8) data[offset+1] = uint8(v >> 8)
data[offset+2] = uint8(v >> 16) data[offset+2] = uint8(v >> 16)
data[offset+3] = uint8(v >> 24) data[offset+3] = uint8(v >> 24)
return offset + 4 return offset + 4
} }
func encodeVarintProtos(data []byte, offset int, v uint64) int { func encodeVarintRaft(data []byte, offset int, v uint64) int {
for v >= 1<<7 { for v >= 1<<7 {
data[offset] = uint8(v&0x7f | 0x80) data[offset] = uint8(v&0x7f | 0x80)
v >>= 7 v >>= 7

View File

@ -1,4 +1,4 @@
package raft; package raftpb;
import "code.google.com/p/gogoprotobuf/gogoproto/gogo.proto"; import "code.google.com/p/gogoprotobuf/gogoproto/gogo.proto";
@ -33,3 +33,10 @@ message Message {
required int64 commit = 8 [(gogoproto.nullable) = false]; required int64 commit = 8 [(gogoproto.nullable) = false];
required Snapshot snapshot = 9 [(gogoproto.nullable) = false]; required Snapshot snapshot = 9 [(gogoproto.nullable) = false];
} }
message State {
required int64 term = 1 [(gogoproto.nullable) = false];
required int64 vote = 2 [(gogoproto.nullable) = false];
required int64 commit = 3 [(gogoproto.nullable) = false];
required int64 lastIndex = 4 [(gogoproto.nullable) = false];
}

View File

@ -1,221 +0,0 @@
// Code generated by protoc-gen-gogo.
// source: state.proto
// DO NOT EDIT!
package raft
import proto "code.google.com/p/gogoprotobuf/proto"
import json "encoding/json"
import math "math"
// discarding unused import gogoproto "code.google.com/p/gogoprotobuf/gogoproto/gogo.pb"
import io1 "io"
import code_google_com_p_gogoprotobuf_proto1 "code.google.com/p/gogoprotobuf/proto"
// Reference proto, json, and math imports to suppress error if they are not otherwise used.
var _ = proto.Marshal
var _ = &json.SyntaxError{}
var _ = math.Inf
type State struct {
Term int64 `protobuf:"varint,1,req,name=term" json:"term"`
Vote int64 `protobuf:"varint,2,req,name=vote" json:"vote"`
Commit int64 `protobuf:"varint,3,req,name=commit" json:"commit"`
LastIndex int64 `protobuf:"varint,4,req,name=lastIndex" json:"lastIndex"`
XXX_unrecognized []byte `json:"-"`
}
func (m *State) Reset() { *m = State{} }
func (m *State) String() string { return proto.CompactTextString(m) }
func (*State) ProtoMessage() {}
func init() {
}
func (m *State) Unmarshal(data []byte) error {
l := len(data)
index := 0
for index < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
if index >= l {
return io1.ErrUnexpectedEOF
}
b := data[index]
index++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
switch fieldNum {
case 1:
if wireType != 0 {
return code_google_com_p_gogoprotobuf_proto1.ErrWrongType
}
for shift := uint(0); ; shift += 7 {
if index >= l {
return io1.ErrUnexpectedEOF
}
b := data[index]
index++
m.Term |= (int64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
case 2:
if wireType != 0 {
return code_google_com_p_gogoprotobuf_proto1.ErrWrongType
}
for shift := uint(0); ; shift += 7 {
if index >= l {
return io1.ErrUnexpectedEOF
}
b := data[index]
index++
m.Vote |= (int64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
case 3:
if wireType != 0 {
return code_google_com_p_gogoprotobuf_proto1.ErrWrongType
}
for shift := uint(0); ; shift += 7 {
if index >= l {
return io1.ErrUnexpectedEOF
}
b := data[index]
index++
m.Commit |= (int64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
case 4:
if wireType != 0 {
return code_google_com_p_gogoprotobuf_proto1.ErrWrongType
}
for shift := uint(0); ; shift += 7 {
if index >= l {
return io1.ErrUnexpectedEOF
}
b := data[index]
index++
m.LastIndex |= (int64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
default:
var sizeOfWire int
for {
sizeOfWire++
wire >>= 7
if wire == 0 {
break
}
}
index -= sizeOfWire
skippy, err := code_google_com_p_gogoprotobuf_proto1.Skip(data[index:])
if err != nil {
return err
}
if (index + skippy) > l {
return io1.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, data[index:index+skippy]...)
index += skippy
}
}
return nil
}
func (m *State) Size() (n int) {
var l int
_ = l
n += 1 + sovState(uint64(m.Term))
n += 1 + sovState(uint64(m.Vote))
n += 1 + sovState(uint64(m.Commit))
n += 1 + sovState(uint64(m.LastIndex))
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func sovState(x uint64) (n int) {
for {
n++
x >>= 7
if x == 0 {
break
}
}
return n
}
func sozState(x uint64) (n int) {
return sovState(uint64((x << 1) ^ uint64((int64(x) >> 63))))
}
func (m *State) Marshal() (data []byte, err error) {
size := m.Size()
data = make([]byte, size)
n, err := m.MarshalTo(data)
if err != nil {
return nil, err
}
return data[:n], nil
}
func (m *State) MarshalTo(data []byte) (n int, err error) {
var i int
_ = i
var l int
_ = l
data[i] = 0x8
i++
i = encodeVarintState(data, i, uint64(m.Term))
data[i] = 0x10
i++
i = encodeVarintState(data, i, uint64(m.Vote))
data[i] = 0x18
i++
i = encodeVarintState(data, i, uint64(m.Commit))
data[i] = 0x20
i++
i = encodeVarintState(data, i, uint64(m.LastIndex))
if m.XXX_unrecognized != nil {
i += copy(data[i:], m.XXX_unrecognized)
}
return i, nil
}
func encodeFixed64State(data []byte, offset int, v uint64) int {
data[offset] = uint8(v)
data[offset+1] = uint8(v >> 8)
data[offset+2] = uint8(v >> 16)
data[offset+3] = uint8(v >> 24)
data[offset+4] = uint8(v >> 32)
data[offset+5] = uint8(v >> 40)
data[offset+6] = uint8(v >> 48)
data[offset+7] = uint8(v >> 56)
return offset + 8
}
func encodeFixed32State(data []byte, offset int, v uint32) int {
data[offset] = uint8(v)
data[offset+1] = uint8(v >> 8)
data[offset+2] = uint8(v >> 16)
data[offset+3] = uint8(v >> 24)
return offset + 4
}
func encodeVarintState(data []byte, offset int, v uint64) int {
for v >= 1<<7 {
data[offset] = uint8(v&0x7f | 0x80)
v >>= 7
offset++
}
data[offset] = uint8(v)
return offset + 1
}

View File

@ -1,15 +0,0 @@
package raft;
import "code.google.com/p/gogoprotobuf/gogoproto/gogo.proto";
option (gogoproto.marshaler_all) = true;
option (gogoproto.sizer_all) = true;
option (gogoproto.unmarshaler_all) = true;
option (gogoproto.goproto_getters_all) = false;
message State {
required int64 term = 1 [(gogoproto.nullable) = false];
required int64 vote = 2 [(gogoproto.nullable) = false];
required int64 commit = 3 [(gogoproto.nullable) = false];
required int64 lastIndex = 4 [(gogoproto.nullable) = false];
}