From d7d6f84f6477662d6a05dd963b8004f417f35fa1 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 7 Oct 2014 20:12:49 +0800 Subject: [PATCH 1/3] raft: rand election timeout --- raft/node_test.go | 6 +++--- raft/raft.go | 17 +++++++++++++++-- raft/raft_test.go | 44 ++++++++++++++++++++++---------------------- 3 files changed, 40 insertions(+), 27 deletions(-) diff --git a/raft/node_test.go b/raft/node_test.go index 64db7d649..3bf5c628b 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -175,7 +175,7 @@ func TestNode(t *testing.T) { }, } - n := StartNode(1, []int64{1}, 0, 0) + n := StartNode(1, []int64{1}, 10, 1) n.Campaign(ctx) if g := <-n.Ready(); !reflect.DeepEqual(g, wants[0]) { t.Errorf("#%d: g = %+v,\n w %+v", 1, g, wants[0]) @@ -207,7 +207,7 @@ func TestNodeRestart(t *testing.T) { CommittedEntries: entries[1 : st.Commit+1], } - n := RestartNode(1, []int64{1}, 0, 0, nil, st, entries) + n := RestartNode(1, []int64{1}, 10, 1, nil, st, entries) if g := <-n.Ready(); !reflect.DeepEqual(g, want) { t.Errorf("g = %+v,\n w %+v", g, want) } @@ -224,7 +224,7 @@ func TestNodeRestart(t *testing.T) { func TestNodeCompact(t *testing.T) { ctx := context.Background() n := newNode() - r := newRaft(1, []int64{1}, 0, 0) + r := newRaft(1, []int64{1}, 10, 1) go n.run(r) n.Campaign(ctx) diff --git a/raft/raft.go b/raft/raft.go index 91c3aad08..616302ca6 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -3,7 +3,9 @@ package raft import ( "errors" "fmt" + "math/rand" "sort" + "time" pb "github.com/coreos/etcd/raft/raftpb" ) @@ -132,6 +134,7 @@ func newRaft(id int64, peers []int64, election, heartbeat int) *raft { if id == None { panic("cannot use none id") } + rand.Seed(time.Now().UnixNano()) r := &raft{ id: id, lead: None, @@ -286,8 +289,7 @@ func (r *raft) tickElection() { return } r.elapsed++ - // TODO (xiangli): elctionTimeout should be randomized. - if r.elapsed > r.electionTimeout { + if r.isElectionTimeout() { r.elapsed = 0 r.Step(pb.Message{From: r.id, Type: msgHup}) } @@ -585,3 +587,14 @@ func (r *raft) loadState(state pb.HardState) { r.Vote = state.Vote r.Commit = state.Commit } + +// isElectionTimeout returns true if r.elapsed is greater than the +// randomized election timeout in [electiontimeout, 2 * electiontimeout - 1). +// Otherwise, it returns false. +func (r *raft) isElectionTimeout() bool { + d := r.elapsed - r.electionTimeout + if d < 0 { + return false + } + return d > int(rand.Int31())%r.electionTimeout +} diff --git a/raft/raft_test.go b/raft/raft_test.go index 1923c553c..2f424629d 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -201,9 +201,9 @@ func TestCommitWithoutNewTermEntry(t *testing.T) { } func TestDuelingCandidates(t *testing.T) { - a := newRaft(-1, nil, 0, 0) // k, id are set later - b := newRaft(-1, nil, 0, 0) - c := newRaft(-1, nil, 0, 0) + a := newRaft(-1, nil, 10, 1) // k, id are set later + b := newRaft(-1, nil, 10, 1) + c := newRaft(-1, nil, 10, 1) nt := newNetwork(a, b, c) nt.cut(1, 3) @@ -499,7 +499,7 @@ func TestStepIgnoreOldTermMsg(t *testing.T) { fakeStep := func(r *raft, m pb.Message) { called = true } - sm := newRaft(1, []int64{1}, 0, 0) + sm := newRaft(1, []int64{1}, 10, 1) sm.step = fakeStep sm.Term = 2 sm.Step(pb.Message{Type: msgApp, Term: sm.Term - 1}) @@ -597,7 +597,7 @@ func TestRecvMsgVote(t *testing.T) { } for i, tt := range tests { - sm := newRaft(1, []int64{1}, 0, 0) + sm := newRaft(1, []int64{1}, 10, 1) sm.state = tt.state switch tt.state { case StateFollower: @@ -654,7 +654,7 @@ func TestStateTransition(t *testing.T) { } }() - sm := newRaft(1, []int64{1}, 0, 0) + sm := newRaft(1, []int64{1}, 10, 1) sm.state = tt.from switch tt.to { @@ -693,7 +693,7 @@ func TestAllServerStepdown(t *testing.T) { tterm := int64(3) for i, tt := range tests { - sm := newRaft(1, []int64{1, 2, 3}, 0, 0) + sm := newRaft(1, []int64{1, 2, 3}, 10, 1) switch tt.state { case StateFollower: sm.becomeFollower(1, None) @@ -743,7 +743,7 @@ func TestLeaderAppResp(t *testing.T) { for i, tt := range tests { // sm term is 1 after it becomes the leader. // thus the last log term must be 1 to be committed. - sm := newRaft(1, []int64{1, 2, 3}, 0, 0) + sm := newRaft(1, []int64{1, 2, 3}, 10, 1) sm.raftLog = &raftLog{ents: []pb.Entry{{}, {Term: 0}, {Term: 1}}} sm.becomeCandidate() sm.becomeLeader() @@ -775,7 +775,7 @@ func TestBcastBeat(t *testing.T) { Term: 1, Nodes: []int64{1, 2, 3}, } - sm := newRaft(1, []int64{1, 2, 3}, 0, 0) + sm := newRaft(1, []int64{1, 2, 3}, 10, 1) sm.Term = 1 sm.restore(s) @@ -825,7 +825,7 @@ func TestRecvMsgBeat(t *testing.T) { } for i, tt := range tests { - sm := newRaft(1, []int64{1, 2, 3}, 0, 0) + sm := newRaft(1, []int64{1, 2, 3}, 10, 1) sm.raftLog = &raftLog{ents: []pb.Entry{{}, {Term: 0}, {Term: 1}}} sm.Term = 1 sm.state = tt.state @@ -858,7 +858,7 @@ func TestRestore(t *testing.T) { Nodes: []int64{1, 2, 3}, } - sm := newRaft(1, []int64{1, 2}, 0, 0) + sm := newRaft(1, []int64{1, 2}, 10, 1) if ok := sm.restore(s); !ok { t.Fatal("restore fail, want succeed") } @@ -891,7 +891,7 @@ func TestProvideSnap(t *testing.T) { Term: defaultCompactThreshold + 1, Nodes: []int64{1, 2}, } - sm := newRaft(1, []int64{1}, 0, 0) + sm := newRaft(1, []int64{1}, 10, 1) // restore the statemachin from a snapshot // so it has a compacted log and a snapshot sm.restore(s) @@ -922,7 +922,7 @@ func TestRestoreFromSnapMsg(t *testing.T) { } m := pb.Message{Type: msgSnap, From: 1, Term: 2, Snapshot: s} - sm := newRaft(2, []int64{1, 2}, 0, 0) + sm := newRaft(2, []int64{1, 2}, 10, 1) sm.Step(m) if !reflect.DeepEqual(sm.raftLog.snapshot, s) { @@ -961,7 +961,7 @@ func TestSlowNodeRestore(t *testing.T) { // it appends the entry to log and sets pendingConf to be true. func TestStepConfig(t *testing.T) { // a raft that cannot make progress - r := newRaft(1, []int64{1, 2}, 0, 0) + r := newRaft(1, []int64{1, 2}, 10, 1) r.becomeCandidate() r.becomeLeader() index := r.raftLog.lastIndex() @@ -979,7 +979,7 @@ func TestStepConfig(t *testing.T) { // the proposal and keep its original state. func TestStepIgnoreConfig(t *testing.T) { // a raft that cannot make progress - r := newRaft(1, []int64{1, 2}, 0, 0) + r := newRaft(1, []int64{1, 2}, 10, 1) r.becomeCandidate() r.becomeLeader() r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}}) @@ -1005,7 +1005,7 @@ func TestRecoverPendingConfig(t *testing.T) { {pb.EntryConfChange, true}, } for i, tt := range tests { - r := newRaft(1, []int64{1, 2}, 0, 0) + r := newRaft(1, []int64{1, 2}, 10, 1) r.appendEntry(pb.Entry{Type: tt.entType}) r.becomeCandidate() r.becomeLeader() @@ -1024,7 +1024,7 @@ func TestRecoverDoublePendingConfig(t *testing.T) { t.Errorf("expect panic, but nothing happens") } }() - r := newRaft(1, []int64{1, 2}, 0, 0) + r := newRaft(1, []int64{1, 2}, 10, 1) r.appendEntry(pb.Entry{Type: pb.EntryConfChange}) r.appendEntry(pb.Entry{Type: pb.EntryConfChange}) r.becomeCandidate() @@ -1034,7 +1034,7 @@ func TestRecoverDoublePendingConfig(t *testing.T) { // TestAddNode tests that addNode could update pendingConf and nodes correctly. func TestAddNode(t *testing.T) { - r := newRaft(1, []int64{1}, 0, 0) + r := newRaft(1, []int64{1}, 10, 1) r.pendingConf = true r.addNode(2) if r.pendingConf != false { @@ -1051,7 +1051,7 @@ func TestAddNode(t *testing.T) { // TestRemoveNode tests that removeNode could update pendingConf, nodes and // and removed list correctly. func TestRemoveNode(t *testing.T) { - r := newRaft(1, []int64{1, 2}, 0, 0) + r := newRaft(1, []int64{1, 2}, 10, 1) r.pendingConf = true r.removeNode(2) if r.pendingConf != false { @@ -1074,7 +1074,7 @@ func TestRecvMsgDenied(t *testing.T) { fakeStep := func(r *raft, m pb.Message) { called = true } - r := newRaft(1, []int64{1, 2}, 0, 0) + r := newRaft(1, []int64{1, 2}, 10, 1) r.step = fakeStep r.Step(pb.Message{From: 2, Type: msgDenied}) if called != false { @@ -1102,7 +1102,7 @@ func TestRecvMsgFromRemovedNode(t *testing.T) { fakeStep := func(r *raft, m pb.Message) { called = true } - r := newRaft(1, []int64{1}, 0, 0) + r := newRaft(1, []int64{1}, 10, 1) r.step = fakeStep r.removeNode(tt.from) r.Step(pb.Message{From: tt.from, Type: msgVote}) @@ -1176,7 +1176,7 @@ func newNetwork(peers ...Interface) *network { id := peerAddrs[i] switch v := p.(type) { case nil: - sm := newRaft(id, peerAddrs, 0, 0) + sm := newRaft(id, peerAddrs, 10, 1) npeers[id] = sm case *raft: v.id = id From f65d11746288130d9deb2d4b00dd8996e275c4f0 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 7 Oct 2014 20:34:15 +0800 Subject: [PATCH 2/3] raft: add a test for randElectionTimeout --- raft/raft_test.go | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/raft/raft_test.go b/raft/raft_test.go index 2f424629d..adca3ad1d 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -3,6 +3,7 @@ package raft import ( "bytes" "fmt" + "math" "math/rand" "reflect" "sort" @@ -492,6 +493,38 @@ func TestCommit(t *testing.T) { } } +func TestIsElectionTimeout(t *testing.T) { + tests := []struct { + elapse int + wpossibility float64 + round bool + }{ + {5, 0, false}, + {13, 0.3, true}, + {15, 0.5, true}, + {18, 0.8, true}, + {20, 1, false}, + } + + for i, tt := range tests { + sm := newRaft(1, []int64{1}, 10, 1) + sm.elapsed = tt.elapse + c := 0 + for j := 0; j < 10000; j++ { + if sm.isElectionTimeout() { + c++ + } + } + got := float64(c) / 10000.0 + if tt.round { + got = math.Floor(got*10+0.5) / 10.0 + } + if got != tt.wpossibility { + t.Errorf("#%d: possibility = %v, want %v", i, got, tt.wpossibility) + } + } +} + // ensure that the Step function ignores the message from old term and does not pass it to the // acutal stepX function. func TestStepIgnoreOldTermMsg(t *testing.T) { From 1cd3345e00636fba53553d23f76a6a33b6c35537 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 8 Oct 2014 07:41:17 +0800 Subject: [PATCH 3/3] raft: address issues with election timeout --- raft/raft.go | 7 +++---- raft/raft_test.go | 6 +++--- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/raft/raft.go b/raft/raft.go index 616302ca6..be2c5d56e 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -5,7 +5,6 @@ import ( "fmt" "math/rand" "sort" - "time" pb "github.com/coreos/etcd/raft/raftpb" ) @@ -134,7 +133,7 @@ func newRaft(id int64, peers []int64, election, heartbeat int) *raft { if id == None { panic("cannot use none id") } - rand.Seed(time.Now().UnixNano()) + rand.Seed(id) r := &raft{ id: id, lead: None, @@ -589,12 +588,12 @@ func (r *raft) loadState(state pb.HardState) { } // isElectionTimeout returns true if r.elapsed is greater than the -// randomized election timeout in [electiontimeout, 2 * electiontimeout - 1). +// randomized election timeout in (electiontimeout, 2 * electiontimeout - 1). // Otherwise, it returns false. func (r *raft) isElectionTimeout() bool { d := r.elapsed - r.electionTimeout if d < 0 { return false } - return d > int(rand.Int31())%r.electionTimeout + return d > rand.Int()%r.electionTimeout } diff --git a/raft/raft_test.go b/raft/raft_test.go index adca3ad1d..57550838b 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -496,7 +496,7 @@ func TestCommit(t *testing.T) { func TestIsElectionTimeout(t *testing.T) { tests := []struct { elapse int - wpossibility float64 + wprobability float64 round bool }{ {5, 0, false}, @@ -519,8 +519,8 @@ func TestIsElectionTimeout(t *testing.T) { if tt.round { got = math.Floor(got*10+0.5) / 10.0 } - if got != tt.wpossibility { - t.Errorf("#%d: possibility = %v, want %v", i, got, tt.wpossibility) + if got != tt.wprobability { + t.Errorf("#%d: possibility = %v, want %v", i, got, tt.wprobability) } } }