From 09d1575eeb5a0c2f6ddd113ddd11306465eb2922 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 28 May 2014 13:53:26 -0700 Subject: [PATCH] raft: node.tick --- raft/node.go | 64 +++++++++++++++++++++++++++++++++++++++-------- raft/node_test.go | 55 ++++++++++++++++++++++++++++++++++++++++ raft/raft.go | 9 +++++++ 3 files changed, 117 insertions(+), 11 deletions(-) create mode 100644 raft/node_test.go diff --git a/raft/node.go b/raft/node.go index ad1aa6509..f2cefe192 100644 --- a/raft/node.go +++ b/raft/node.go @@ -1,20 +1,35 @@ package raft -import "sync" - type Interface interface { Step(m Message) } +type tick int + type Node struct { - lk sync.Mutex - sm *stateMachine + // election timeout and heartbeat timeout in tick + election tick + heartbeat tick + + // elapsed ticks after the last reset + elapsed tick + sm *stateMachine + + next Interface } -func New(k, addr int, next Interface) *Node { - n := &Node{ - sm: newStateMachine(k, addr), +func New(k, addr int, heartbeat, election tick, next Interface) *Node { + if election < heartbeat*3 { + panic("election is least three times as heartbeat [election: %d, heartbeat: %d]") } + + n := &Node{ + sm: newStateMachine(k, addr), + next: next, + heartbeat: heartbeat, + election: election, + } + return n } @@ -25,15 +40,42 @@ func (n *Node) Propose(data []byte) { } func (n *Node) Step(m Message) { - n.lk.Lock() - defer n.lk.Unlock() n.sm.Step(m) + ms := n.sm.Msgs() + for _, m := range ms { + // reset elapsed in two cases: + // msgAppResp -> heard from the leader of the same term + // msgVoteResp with grant -> heard from the candidate the node voted for + switch m.Type { + case msgAppResp: + n.elapsed = 0 + case msgVoteResp: + if m.Index >= 0 { + n.elapsed = 0 + } + } + n.next.Step(m) + } } // Next advances the commit index and returns any new // commitable entries. func (n *Node) Next() []Entry { - n.lk.Lock() - defer n.lk.Unlock() return n.sm.nextEnts() } + +// Tick triggers the node to do a tick. +// If the current elapsed is greater or equal than the timeout, +// node will send corresponding message to the statemachine. +func (n *Node) Tick() { + timeout, msgType := n.election, msgHup + if n.sm.state == stateLeader { + timeout, msgType = n.heartbeat, msgBeat + } + if n.elapsed >= timeout { + n.Step(Message{Type: msgType}) + n.elapsed = 0 + } else { + n.elapsed++ + } +} diff --git a/raft/node_test.go b/raft/node_test.go new file mode 100644 index 000000000..d56188376 --- /dev/null +++ b/raft/node_test.go @@ -0,0 +1,55 @@ +package raft + +import "testing" + +const ( + defaultHeartbeat = 1 + defaultElection = 5 +) + +func TestTickMsgHub(t *testing.T) { + n := New(3, 0, defaultHeartbeat, defaultElection, nil) + + called := false + n.next = stepperFunc(func(m Message) { + if m.Type == msgVote { + called = true + } + }) + + for i := 0; i < defaultElection+1; i++ { + n.Tick() + } + + if !called { + t.Errorf("called = %v, want true", called) + } +} + +func TestTickMsgBeat(t *testing.T) { + k := 3 + n := New(k, 0, defaultHeartbeat, defaultElection, nil) + + called := 0 + n.next = stepperFunc(func(m Message) { + if m.Type == msgApp { + called++ + } + if m.Type == msgVote { + n.Step(Message{From: 1, Type: msgVoteResp, Index: 1, Term: 1}) + } + }) + + n.Step(Message{Type: msgHup}) // become leader please + + for i := 0; i < defaultHeartbeat+1; i++ { + n.Tick() + } + + // becomeLeader -> k-1 append + // msgBeat -> k-1 append + w := (k - 1) * 2 + if called != w { + t.Errorf("called = %v, want %v", called, w) + } +} diff --git a/raft/raft.go b/raft/raft.go index 9a843447c..e62553610 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -11,6 +11,7 @@ type messageType int const ( msgHup messageType = iota + msgBeat msgProp msgApp msgAppResp @@ -20,6 +21,7 @@ const ( var mtmap = [...]string{ msgHup: "msgHup", + msgBeat: "msgBeat", msgProp: "msgProp", msgApp: "msgApp", msgAppResp: "msgAppResp", @@ -247,6 +249,13 @@ func (sm *stateMachine) Step(m Message) { sm.send(Message{To: i, Type: msgVote, Index: lasti, LogTerm: sm.log.term(lasti)}) } return + case msgBeat: + if sm.state != stateLeader { + return + } + // todo(xiangli) broadcast append + // blocker github issue #13 + sm.sendAppend() case msgProp: switch sm.lead { case sm.addr: