From d218034630e5e8d8eab3adb3fbd08d3948ffbf9f Mon Sep 17 00:00:00 2001 From: Blake Mizerany Date: Tue, 2 Sep 2014 16:59:29 -0700 Subject: [PATCH] boom --- etcdserver2/server.go | 4 +++ etcdserver2/server_test.go | 14 ++++---- raft/example_test.go | 2 +- raft/node.go | 6 ++-- raft/node_test.go | 2 +- raft/raft.go | 72 ++++++++++++++++++++++++++++++-------- raft/raft_test.go | 38 ++++++++++---------- 7 files changed, 94 insertions(+), 44 deletions(-) diff --git a/etcdserver2/server.go b/etcdserver2/server.go index 4905647f2..2a6828d06 100644 --- a/etcdserver2/server.go +++ b/etcdserver2/server.go @@ -42,6 +42,8 @@ type Server struct { // Save MUST block until st and ents are on stable storage. If Send is // nil, Server will panic. Save func(st raftpb.State, ents []raftpb.Entry) + + Ticker <-chan time.Time } // Start prepares and starts server in a new goroutine. It is no longer safe to @@ -55,6 +57,8 @@ func Start(s *Server) { func (s *Server) run() { for { select { + case <-s.Ticker: + s.Node.Tick() case rd := <-s.Node.Ready(): s.Save(rd.State, rd.Entries) s.Send(rd.Messages) diff --git a/etcdserver2/server_test.go b/etcdserver2/server_test.go index 5f508bb10..ed02ec1f8 100644 --- a/etcdserver2/server_test.go +++ b/etcdserver2/server_test.go @@ -35,13 +35,15 @@ func testServer(t *testing.T, ns int64) { } for i := int64(0); i < ns; i++ { - n := raft.Start(i, peers) - + n := raft.Start(i, peers, 1, 10) + tk := time.NewTicker(10 * time.Millisecond) + defer tk.Stop() srv := &Server{ - Node: n, - Store: store.New(), - Send: send, - Save: func(_ raftpb.State, _ []raftpb.Entry) {}, + Node: n, + Store: store.New(), + Send: send, + Save: func(_ raftpb.State, _ []raftpb.Entry) {}, + Ticker: tk.C, } Start(srv) diff --git a/raft/example_test.go b/raft/example_test.go index 0a36ad0e3..efe775d8c 100644 --- a/raft/example_test.go +++ b/raft/example_test.go @@ -10,7 +10,7 @@ func saveStateToDisk(st pb.State) {} func saveToDisk(ents []pb.Entry) {} func Example_Node() { - n := Start(0, nil) + n := Start(0, nil, 0, 0) // stuff to n happens in other goroutines diff --git a/raft/node.go b/raft/node.go index 784bd54c6..12d265713 100644 --- a/raft/node.go +++ b/raft/node.go @@ -47,7 +47,7 @@ type Node struct { done chan struct{} } -func Start(id int64, peers []int64) Node { +func Start(id int64, peers []int64, election, heartbeat int) Node { n := Node{ propc: make(chan pb.Message), recvc: make(chan pb.Message), @@ -56,7 +56,7 @@ func Start(id int64, peers []int64) Node { alwaysreadyc: make(chan Ready), done: make(chan struct{}), } - r := newRaft(id, peers) + r := newRaft(id, peers, election, heartbeat) go n.run(r) return n } @@ -103,7 +103,7 @@ func (n *Node) run(r *raft) { case m := <-n.recvc: r.Step(m) // raft never returns an error case <-n.tickc: - // r.tick() + r.tick() case readyc <- rd: r.raftLog.resetNextEnts() r.raftLog.resetUnstable() diff --git a/raft/node_test.go b/raft/node_test.go index ee6261fbb..51a966b08 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -12,7 +12,7 @@ func TestNode(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - n := Start(1, []int64{1}) + n := Start(1, []int64{1}, 0, 0) ch := make(chan Ready) go func() { for { diff --git a/raft/raft.go b/raft/raft.go index f677adaa5..1202e1b18 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -121,17 +121,29 @@ type raft struct { // New machine has to wait until it has been added to the cluster, or it // may become the leader of the cluster without it. promotable bool + + elapsed int + heartbeatTimeout int + electionTimeout int + tick func() } -func newRaft(id int64, peers []int64) *raft { +func newRaft(id int64, peers []int64, election, heartbeat int) *raft { if id == none { panic("cannot use none id") } - r := &raft{id: id, lead: none, raftLog: newLog(), prs: make(map[int64]*progress)} + r := &raft{ + id: id, + lead: none, + raftLog: newLog(), + prs: make(map[int64]*progress), + electionTimeout: election, + heartbeatTimeout: heartbeat, + } for _, p := range peers { r.prs[p] = &progress{} } - r.reset(0) + r.becomeFollower(0, none) return r } @@ -258,7 +270,29 @@ func (r *raft) appendEntry(e pb.Entry) { r.maybeCommit() } +func (r *raft) tickElection() { + r.elapsed++ + if r.elapsed > r.electionTimeout { + r.elapsed = 0 + r.campaign() + } +} + +func (r *raft) tickHeartbeat() { + r.elapsed++ + if r.elapsed > r.heartbeatTimeout { + r.elapsed = 0 + r.bcastHeartbeat() + } +} + +func (r *raft) setTick(f func()) { + r.elapsed = 0 + r.tick = f +} + func (r *raft) becomeFollower(term int64, lead int64) { + r.setTick(r.tickElection) r.reset(term) r.lead = lead r.state = stateFollower @@ -266,6 +300,7 @@ func (r *raft) becomeFollower(term int64, lead int64) { } func (r *raft) becomeCandidate() { + r.setTick(r.tickElection) // TODO(xiangli) remove the panic when the raft implementation is stable if r.state == stateLeader { panic("invalid transition [leader -> candidate]") @@ -276,6 +311,7 @@ func (r *raft) becomeCandidate() { } func (r *raft) becomeLeader() { + r.setTick(r.tickHeartbeat) // TODO(xiangli) remove the panic when the raft implementation is stable if r.state == stateFollower { panic("invalid transition [follower -> leader]") @@ -300,22 +336,26 @@ func (r *raft) ReadMessages() []pb.Message { return msgs } +func (r *raft) campaign() { + r.becomeCandidate() + if r.q() == r.poll(r.id, true) { + r.becomeLeader() + } + for i := range r.prs { + if i == r.id { + continue + } + lasti := r.raftLog.lastIndex() + r.send(pb.Message{To: i, Type: msgVote, Index: lasti, LogTerm: r.raftLog.term(lasti)}) + } +} + func (r *raft) Step(m pb.Message) error { // TODO(bmizerany): this likely allocs - prevent that. defer func() { r.Commit = r.raftLog.committed }() if m.Type == msgHup { - r.becomeCandidate() - if r.q() == r.poll(r.id, true) { - r.becomeLeader() - } - for i := range r.prs { - if i == r.id { - continue - } - lasti := r.raftLog.lastIndex() - r.send(pb.Message{To: i, Type: msgVote, Index: lasti, LogTerm: r.raftLog.term(lasti)}) - } + r.campaign() } switch { @@ -404,6 +444,7 @@ func stepCandidate(r *raft, m pb.Message) { case msgProp: panic("no leader") case msgApp: + r.elapsed = 0 r.becomeFollower(r.Term, m.From) r.handleAppendEntries(m) case msgSnap: @@ -432,11 +473,14 @@ func stepFollower(r *raft, m pb.Message) { m.To = r.lead r.send(m) case msgApp: + r.elapsed = 0 r.lead = m.From r.handleAppendEntries(m) case msgSnap: + r.elapsed = 0 r.handleSnapshot(m) case msgVote: + // TODO(xiang): maybe reset elapsed? if (r.Vote == none || r.Vote == m.From) && r.raftLog.isUpToDate(m.Index, m.LogTerm) { r.Vote = m.From r.send(pb.Message{To: m.From, Type: msgVoteResp, Index: r.raftLog.lastIndex()}) diff --git a/raft/raft_test.go b/raft/raft_test.go index 7be7250c4..d3e549a24 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -210,9 +210,9 @@ func TestCommitWithoutNewTermEntry(t *testing.T) { } func TestDuelingCandidates(t *testing.T) { - a := newRaft(0, nil) // k, id are set later - b := newRaft(0, nil) - c := newRaft(0, nil) + a := newRaft(0, nil, 0, 0) // k, id are set later + b := newRaft(0, nil, 0, 0) + c := newRaft(0, nil, 0, 0) nt := newNetwork(a, b, c) nt.cut(0, 2) @@ -488,9 +488,9 @@ func TestHandleMsgApp(t *testing.T) { for i, tt := range tests { sm := &raft{ - state: stateFollower, - State: pb.State{Term: 2}, - raftLog: &raftLog{committed: 0, ents: []pb.Entry{{}, {Term: 1}, {Term: 2}}}, + state: stateFollower, + State: pb.State{Term: 2}, + raftLog: &raftLog{committed: 0, ents: []pb.Entry{{}, {Term: 1}, {Term: 2}}}, } sm.handleAppendEntries(tt.m) @@ -550,9 +550,9 @@ func TestRecvMsgVote(t *testing.T) { for i, tt := range tests { sm := &raft{ - state: tt.state, - State: pb.State{Vote: tt.voteFor}, - raftLog: &raftLog{ents: []pb.Entry{{}, {Term: 2}, {Term: 2}}}, + state: tt.state, + State: pb.State{Vote: tt.voteFor}, + raftLog: &raftLog{ents: []pb.Entry{{}, {Term: 2}, {Term: 2}}}, } sm.Step(pb.Message{Type: msgVote, From: 1, Index: tt.i, LogTerm: tt.term}) @@ -599,7 +599,7 @@ func TestStateTransition(t *testing.T) { } }() - sm := newRaft(0, []int64{0}) + sm := newRaft(0, []int64{0}, 0, 0) sm.state = tt.from switch tt.to { @@ -622,7 +622,7 @@ func TestStateTransition(t *testing.T) { } func TestConf(t *testing.T) { - sm := newRaft(0, []int64{0}) + sm := newRaft(0, []int64{0}, 0, 0) sm.becomeCandidate() sm.becomeLeader() @@ -662,7 +662,7 @@ func TestConfChangeLeader(t *testing.T) { } for i, tt := range tests { - sm := newRaft(0, []int64{0}) + sm := newRaft(0, []int64{0}, 0, 0) sm.raftLog = &raftLog{ents: []pb.Entry{{}, {Type: tt.et}}} sm.becomeCandidate() @@ -691,7 +691,7 @@ func TestAllServerStepdown(t *testing.T) { tterm := int64(3) for i, tt := range tests { - sm := newRaft(0, []int64{0, 1, 2}) + sm := newRaft(0, []int64{0, 1, 2}, 0, 0) switch tt.state { case stateFollower: sm.becomeFollower(1, 0) @@ -739,7 +739,7 @@ func TestLeaderAppResp(t *testing.T) { for i, tt := range tests { // sm term is 1 after it becomes the leader. // thus the last log term must be 1 to be committed. - sm := newRaft(0, []int64{0, 1, 2}) + sm := newRaft(0, []int64{0, 1, 2}, 0, 0) sm.raftLog = &raftLog{ents: []pb.Entry{{}, {Term: 0}, {Term: 1}}} sm.becomeCandidate() sm.becomeLeader() @@ -774,7 +774,7 @@ func TestRecvMsgBeat(t *testing.T) { } for i, tt := range tests { - sm := newRaft(0, []int64{0, 1, 2}) + sm := newRaft(0, []int64{0, 1, 2}, 0, 0) sm.raftLog = &raftLog{ents: []pb.Entry{{}, {Term: 0}, {Term: 1}}} sm.Term = 1 sm.state = tt.state @@ -799,7 +799,7 @@ func TestRestore(t *testing.T) { Nodes: []int64{0, 1, 2}, } - sm := newRaft(0, []int64{0, 1}) + sm := newRaft(0, []int64{0, 1}, 0, 0) if ok := sm.restore(s); !ok { t.Fatal("restore fail, want succeed") } @@ -832,7 +832,7 @@ func TestProvideSnap(t *testing.T) { Term: defaultCompactThreshold + 1, Nodes: []int64{0, 1}, } - sm := newRaft(0, []int64{0}) + sm := newRaft(0, []int64{0}, 0, 0) // restore the statemachin from a snapshot // so it has a compacted log and a snapshot sm.restore(s) @@ -873,7 +873,7 @@ func TestRestoreFromSnapMsg(t *testing.T) { } m := pb.Message{Type: msgSnap, From: 0, Term: 1, Snapshot: s} - sm := newRaft(1, []int64{0, 1}) + sm := newRaft(1, []int64{0, 1}, 0, 0) sm.Step(m) if !reflect.DeepEqual(sm.raftLog.snapshot, s) { @@ -942,7 +942,7 @@ func newNetwork(peers ...Interface) *network { nid := int64(id) switch v := p.(type) { case nil: - sm := newRaft(nid, defaultPeerAddrs) + sm := newRaft(nid, defaultPeerAddrs, 0, 0) npeers[nid] = sm case *raft: v.id = nid