This commit is contained in:
Blake Mizerany 2014-09-02 16:59:29 -07:00 committed by Yicheng Qin
parent fa11eef6d0
commit d218034630
7 changed files with 94 additions and 44 deletions

View File

@ -42,6 +42,8 @@ type Server struct {
// Save MUST block until st and ents are on stable storage. If Send is
// nil, Server will panic.
Save func(st raftpb.State, ents []raftpb.Entry)
Ticker <-chan time.Time
}
// Start prepares and starts server in a new goroutine. It is no longer safe to
@ -55,6 +57,8 @@ func Start(s *Server) {
func (s *Server) run() {
for {
select {
case <-s.Ticker:
s.Node.Tick()
case rd := <-s.Node.Ready():
s.Save(rd.State, rd.Entries)
s.Send(rd.Messages)

View File

@ -35,13 +35,15 @@ func testServer(t *testing.T, ns int64) {
}
for i := int64(0); i < ns; i++ {
n := raft.Start(i, peers)
n := raft.Start(i, peers, 1, 10)
tk := time.NewTicker(10 * time.Millisecond)
defer tk.Stop()
srv := &Server{
Node: n,
Store: store.New(),
Send: send,
Save: func(_ raftpb.State, _ []raftpb.Entry) {},
Node: n,
Store: store.New(),
Send: send,
Save: func(_ raftpb.State, _ []raftpb.Entry) {},
Ticker: tk.C,
}
Start(srv)

View File

@ -10,7 +10,7 @@ func saveStateToDisk(st pb.State) {}
func saveToDisk(ents []pb.Entry) {}
func Example_Node() {
n := Start(0, nil)
n := Start(0, nil, 0, 0)
// stuff to n happens in other goroutines

View File

@ -47,7 +47,7 @@ type Node struct {
done chan struct{}
}
func Start(id int64, peers []int64) Node {
func Start(id int64, peers []int64, election, heartbeat int) Node {
n := Node{
propc: make(chan pb.Message),
recvc: make(chan pb.Message),
@ -56,7 +56,7 @@ func Start(id int64, peers []int64) Node {
alwaysreadyc: make(chan Ready),
done: make(chan struct{}),
}
r := newRaft(id, peers)
r := newRaft(id, peers, election, heartbeat)
go n.run(r)
return n
}
@ -103,7 +103,7 @@ func (n *Node) run(r *raft) {
case m := <-n.recvc:
r.Step(m) // raft never returns an error
case <-n.tickc:
// r.tick()
r.tick()
case readyc <- rd:
r.raftLog.resetNextEnts()
r.raftLog.resetUnstable()

View File

@ -12,7 +12,7 @@ func TestNode(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
n := Start(1, []int64{1})
n := Start(1, []int64{1}, 0, 0)
ch := make(chan Ready)
go func() {
for {

View File

@ -121,17 +121,29 @@ type raft struct {
// New machine has to wait until it has been added to the cluster, or it
// may become the leader of the cluster without it.
promotable bool
elapsed int
heartbeatTimeout int
electionTimeout int
tick func()
}
func newRaft(id int64, peers []int64) *raft {
func newRaft(id int64, peers []int64, election, heartbeat int) *raft {
if id == none {
panic("cannot use none id")
}
r := &raft{id: id, lead: none, raftLog: newLog(), prs: make(map[int64]*progress)}
r := &raft{
id: id,
lead: none,
raftLog: newLog(),
prs: make(map[int64]*progress),
electionTimeout: election,
heartbeatTimeout: heartbeat,
}
for _, p := range peers {
r.prs[p] = &progress{}
}
r.reset(0)
r.becomeFollower(0, none)
return r
}
@ -258,7 +270,29 @@ func (r *raft) appendEntry(e pb.Entry) {
r.maybeCommit()
}
func (r *raft) tickElection() {
r.elapsed++
if r.elapsed > r.electionTimeout {
r.elapsed = 0
r.campaign()
}
}
func (r *raft) tickHeartbeat() {
r.elapsed++
if r.elapsed > r.heartbeatTimeout {
r.elapsed = 0
r.bcastHeartbeat()
}
}
func (r *raft) setTick(f func()) {
r.elapsed = 0
r.tick = f
}
func (r *raft) becomeFollower(term int64, lead int64) {
r.setTick(r.tickElection)
r.reset(term)
r.lead = lead
r.state = stateFollower
@ -266,6 +300,7 @@ func (r *raft) becomeFollower(term int64, lead int64) {
}
func (r *raft) becomeCandidate() {
r.setTick(r.tickElection)
// TODO(xiangli) remove the panic when the raft implementation is stable
if r.state == stateLeader {
panic("invalid transition [leader -> candidate]")
@ -276,6 +311,7 @@ func (r *raft) becomeCandidate() {
}
func (r *raft) becomeLeader() {
r.setTick(r.tickHeartbeat)
// TODO(xiangli) remove the panic when the raft implementation is stable
if r.state == stateFollower {
panic("invalid transition [follower -> leader]")
@ -300,22 +336,26 @@ func (r *raft) ReadMessages() []pb.Message {
return msgs
}
func (r *raft) campaign() {
r.becomeCandidate()
if r.q() == r.poll(r.id, true) {
r.becomeLeader()
}
for i := range r.prs {
if i == r.id {
continue
}
lasti := r.raftLog.lastIndex()
r.send(pb.Message{To: i, Type: msgVote, Index: lasti, LogTerm: r.raftLog.term(lasti)})
}
}
func (r *raft) Step(m pb.Message) error {
// TODO(bmizerany): this likely allocs - prevent that.
defer func() { r.Commit = r.raftLog.committed }()
if m.Type == msgHup {
r.becomeCandidate()
if r.q() == r.poll(r.id, true) {
r.becomeLeader()
}
for i := range r.prs {
if i == r.id {
continue
}
lasti := r.raftLog.lastIndex()
r.send(pb.Message{To: i, Type: msgVote, Index: lasti, LogTerm: r.raftLog.term(lasti)})
}
r.campaign()
}
switch {
@ -404,6 +444,7 @@ func stepCandidate(r *raft, m pb.Message) {
case msgProp:
panic("no leader")
case msgApp:
r.elapsed = 0
r.becomeFollower(r.Term, m.From)
r.handleAppendEntries(m)
case msgSnap:
@ -432,11 +473,14 @@ func stepFollower(r *raft, m pb.Message) {
m.To = r.lead
r.send(m)
case msgApp:
r.elapsed = 0
r.lead = m.From
r.handleAppendEntries(m)
case msgSnap:
r.elapsed = 0
r.handleSnapshot(m)
case msgVote:
// TODO(xiang): maybe reset elapsed?
if (r.Vote == none || r.Vote == m.From) && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
r.Vote = m.From
r.send(pb.Message{To: m.From, Type: msgVoteResp, Index: r.raftLog.lastIndex()})

View File

@ -210,9 +210,9 @@ func TestCommitWithoutNewTermEntry(t *testing.T) {
}
func TestDuelingCandidates(t *testing.T) {
a := newRaft(0, nil) // k, id are set later
b := newRaft(0, nil)
c := newRaft(0, nil)
a := newRaft(0, nil, 0, 0) // k, id are set later
b := newRaft(0, nil, 0, 0)
c := newRaft(0, nil, 0, 0)
nt := newNetwork(a, b, c)
nt.cut(0, 2)
@ -488,9 +488,9 @@ func TestHandleMsgApp(t *testing.T) {
for i, tt := range tests {
sm := &raft{
state: stateFollower,
State: pb.State{Term: 2},
raftLog: &raftLog{committed: 0, ents: []pb.Entry{{}, {Term: 1}, {Term: 2}}},
state: stateFollower,
State: pb.State{Term: 2},
raftLog: &raftLog{committed: 0, ents: []pb.Entry{{}, {Term: 1}, {Term: 2}}},
}
sm.handleAppendEntries(tt.m)
@ -550,9 +550,9 @@ func TestRecvMsgVote(t *testing.T) {
for i, tt := range tests {
sm := &raft{
state: tt.state,
State: pb.State{Vote: tt.voteFor},
raftLog: &raftLog{ents: []pb.Entry{{}, {Term: 2}, {Term: 2}}},
state: tt.state,
State: pb.State{Vote: tt.voteFor},
raftLog: &raftLog{ents: []pb.Entry{{}, {Term: 2}, {Term: 2}}},
}
sm.Step(pb.Message{Type: msgVote, From: 1, Index: tt.i, LogTerm: tt.term})
@ -599,7 +599,7 @@ func TestStateTransition(t *testing.T) {
}
}()
sm := newRaft(0, []int64{0})
sm := newRaft(0, []int64{0}, 0, 0)
sm.state = tt.from
switch tt.to {
@ -622,7 +622,7 @@ func TestStateTransition(t *testing.T) {
}
func TestConf(t *testing.T) {
sm := newRaft(0, []int64{0})
sm := newRaft(0, []int64{0}, 0, 0)
sm.becomeCandidate()
sm.becomeLeader()
@ -662,7 +662,7 @@ func TestConfChangeLeader(t *testing.T) {
}
for i, tt := range tests {
sm := newRaft(0, []int64{0})
sm := newRaft(0, []int64{0}, 0, 0)
sm.raftLog = &raftLog{ents: []pb.Entry{{}, {Type: tt.et}}}
sm.becomeCandidate()
@ -691,7 +691,7 @@ func TestAllServerStepdown(t *testing.T) {
tterm := int64(3)
for i, tt := range tests {
sm := newRaft(0, []int64{0, 1, 2})
sm := newRaft(0, []int64{0, 1, 2}, 0, 0)
switch tt.state {
case stateFollower:
sm.becomeFollower(1, 0)
@ -739,7 +739,7 @@ func TestLeaderAppResp(t *testing.T) {
for i, tt := range tests {
// sm term is 1 after it becomes the leader.
// thus the last log term must be 1 to be committed.
sm := newRaft(0, []int64{0, 1, 2})
sm := newRaft(0, []int64{0, 1, 2}, 0, 0)
sm.raftLog = &raftLog{ents: []pb.Entry{{}, {Term: 0}, {Term: 1}}}
sm.becomeCandidate()
sm.becomeLeader()
@ -774,7 +774,7 @@ func TestRecvMsgBeat(t *testing.T) {
}
for i, tt := range tests {
sm := newRaft(0, []int64{0, 1, 2})
sm := newRaft(0, []int64{0, 1, 2}, 0, 0)
sm.raftLog = &raftLog{ents: []pb.Entry{{}, {Term: 0}, {Term: 1}}}
sm.Term = 1
sm.state = tt.state
@ -799,7 +799,7 @@ func TestRestore(t *testing.T) {
Nodes: []int64{0, 1, 2},
}
sm := newRaft(0, []int64{0, 1})
sm := newRaft(0, []int64{0, 1}, 0, 0)
if ok := sm.restore(s); !ok {
t.Fatal("restore fail, want succeed")
}
@ -832,7 +832,7 @@ func TestProvideSnap(t *testing.T) {
Term: defaultCompactThreshold + 1,
Nodes: []int64{0, 1},
}
sm := newRaft(0, []int64{0})
sm := newRaft(0, []int64{0}, 0, 0)
// restore the statemachin from a snapshot
// so it has a compacted log and a snapshot
sm.restore(s)
@ -873,7 +873,7 @@ func TestRestoreFromSnapMsg(t *testing.T) {
}
m := pb.Message{Type: msgSnap, From: 0, Term: 1, Snapshot: s}
sm := newRaft(1, []int64{0, 1})
sm := newRaft(1, []int64{0, 1}, 0, 0)
sm.Step(m)
if !reflect.DeepEqual(sm.raftLog.snapshot, s) {
@ -942,7 +942,7 @@ func newNetwork(peers ...Interface) *network {
nid := int64(id)
switch v := p.(type) {
case nil:
sm := newRaft(nid, defaultPeerAddrs)
sm := newRaft(nid, defaultPeerAddrs, 0, 0)
npeers[nid] = sm
case *raft:
v.id = nid