diff --git a/raft/node.go b/raft/node.go index cb97feb13..85a2bb6de 100644 --- a/raft/node.go +++ b/raft/node.go @@ -47,7 +47,7 @@ func (n *Node) Id() int64 { func (n *Node) Index() int64 { return n.sm.log.lastIndex() } -func (n *Node) Term() int64 { return n.sm.term } +func (n *Node) Term() int64 { return n.sm.term.Get() } func (n *Node) Applied() int64 { return n.sm.log.applied } diff --git a/raft/raft.go b/raft/raft.go index a19a41753..3f89fb7c9 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -112,7 +112,7 @@ type stateMachine struct { id int64 // the term we are participating in at any time - term int64 + term atomicInt // who we voted for in term vote int64 @@ -165,7 +165,7 @@ func (sm *stateMachine) poll(id int64, v bool) (granted int) { // send persists state to stable storage and then sends to its mailbox. func (sm *stateMachine) send(m Message) { m.From = sm.id - m.Term = sm.term + m.Term = sm.term.Get() sm.msgs = append(sm.msgs, m) } @@ -206,7 +206,7 @@ func (sm *stateMachine) maybeCommit() bool { sort.Sort(sort.Reverse(mis)) mci := mis[sm.q()-1] - return sm.log.maybeCommit(mci, sm.term) + return sm.log.maybeCommit(mci, sm.term.Get()) } // nextEnts returns the appliable entries and updates the applied index @@ -215,7 +215,7 @@ func (sm *stateMachine) nextEnts() (ents []Entry) { } func (sm *stateMachine) reset(term int64) { - sm.term = term + sm.term.Set(term) sm.lead.Set(none) sm.vote = none sm.votes = make(map[int64]bool) @@ -232,7 +232,7 @@ func (sm *stateMachine) q() int { } func (sm *stateMachine) appendEntry(e Entry) { - e.Term = sm.term + e.Term = sm.term.Get() sm.log.append(sm.log.lastIndex(), e) sm.ins[sm.id].update(sm.log.lastIndex()) sm.maybeCommit() @@ -257,7 +257,7 @@ func (sm *stateMachine) becomeCandidate() { if sm.state == stateLeader { panic("invalid transition [leader -> candidate]") } - sm.reset(sm.term + 1) + sm.reset(sm.term.Get() + 1) sm.vote = sm.id sm.state = stateCandidate } @@ -267,7 +267,7 @@ func (sm *stateMachine) becomeLeader() { if sm.state == stateFollower { panic("invalid transition [follower -> leader]") } - sm.reset(sm.term) + sm.reset(sm.term.Get()) sm.lead.Set(sm.id) sm.state = stateLeader @@ -307,9 +307,9 @@ func (sm *stateMachine) Step(m Message) (ok bool) { switch { case m.Term == 0: // local message - case m.Term > sm.term: + case m.Term > sm.term.Get(): sm.becomeFollower(m.Term, m.From) - case m.Term < sm.term: + case m.Term < sm.term.Get(): // ignore return true } @@ -380,7 +380,7 @@ func stepCandidate(sm *stateMachine, m Message) bool { case msgProp: return false case msgApp: - sm.becomeFollower(sm.term, m.From) + sm.becomeFollower(sm.term.Get(), m.From) sm.handleAppendEntries(m) case msgSnap: sm.becomeFollower(m.Term, m.From) @@ -394,7 +394,7 @@ func stepCandidate(sm *stateMachine, m Message) bool { sm.becomeLeader() sm.bcastAppend() case len(sm.votes) - gr: - sm.becomeFollower(sm.term, none) + sm.becomeFollower(sm.term.Get(), none) } } return true diff --git a/raft/raft_test.go b/raft/raft_test.go index 7ea3328cb..7d2cf3692 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -33,7 +33,7 @@ func TestLeaderElection(t *testing.T) { if sm.state != tt.state { t.Errorf("#%d: state = %s, want %s", i, sm.state, tt.state) } - if g := sm.term; g != 1 { + if g := sm.term.Get(); g != 1 { t.Errorf("#%d: term = %d, want %d", i, g, 1) } } @@ -226,7 +226,7 @@ func TestDuelingCandidates(t *testing.T) { if g := tt.sm.state; g != tt.state { t.Errorf("#%d: state = %s, want %s", i, g, tt.state) } - if g := tt.sm.term; g != tt.term { + if g := tt.sm.term.Get(); g != tt.term { t.Errorf("#%d: term = %d, want %d", i, g, tt.term) } base := ltoa(tt.log) @@ -365,7 +365,7 @@ func TestProposal(t *testing.T) { } } sm := tt.network.peers[0].(*stateMachine) - if g := sm.term; g != 1 { + if g := sm.term.Get(); g != 1 { t.Errorf("#%d: term = %d, want %d", i, g, 1) } } @@ -398,7 +398,7 @@ func TestProposalByProxy(t *testing.T) { } } sm := tt.peers[0].(*stateMachine) - if g := sm.term; g != 1 { + if g := sm.term.Get(); g != 1 { t.Errorf("#%d: term = %d, want %d", i, g, 1) } } @@ -437,7 +437,7 @@ func TestCommit(t *testing.T) { for j := 0; j < len(tt.matches); j++ { ins[int64(j)] = &index{tt.matches[j], tt.matches[j] + 1} } - sm := &stateMachine{log: &log{ents: tt.logs}, ins: ins, term: tt.smTerm} + sm := &stateMachine{log: &log{ents: tt.logs}, ins: ins, term: atomicInt(tt.smTerm)} sm.maybeCommit() if g := sm.log.committed; g != tt.w { t.Errorf("#%d: committed = %d, want %d", i, g, tt.w) @@ -542,8 +542,8 @@ func TestStateTransition(t *testing.T) { sm.becomeLeader() } - if sm.term != tt.wterm { - t.Errorf("%d: term = %d, want %d", i, sm.term, tt.wterm) + if sm.term.Get() != tt.wterm { + t.Errorf("%d: term = %d, want %d", i, sm.term.Get(), tt.wterm) } if sm.lead.Get() != tt.wlead { t.Errorf("%d: lead = %d, want %d", i, sm.lead, tt.wlead) @@ -634,8 +634,8 @@ func TestAllServerStepdown(t *testing.T) { if sm.state != tt.wstate { t.Errorf("#%d.%d state = %v , want %v", i, j, sm.state, tt.wstate) } - if sm.term != tt.wterm { - t.Errorf("#%d.%d term = %v , want %v", i, j, sm.term, tt.wterm) + if sm.term.Get() != tt.wterm { + t.Errorf("#%d.%d term = %v , want %v", i, j, sm.term.Get(), tt.wterm) } if int64(len(sm.log.ents)) != tt.windex { t.Errorf("#%d.%d index = %v , want %v", i, j, len(sm.log.ents), tt.windex) @@ -663,7 +663,7 @@ func TestLeaderAppResp(t *testing.T) { sm.becomeCandidate() sm.becomeLeader() sm.Msgs() - sm.Step(Message{From: 1, Type: msgAppResp, Index: tt.index, Term: sm.term}) + sm.Step(Message{From: 1, Type: msgAppResp, Index: tt.index, Term: sm.term.Get()}) msgs := sm.Msgs() if len(msgs) != tt.wmsgNum { @@ -695,7 +695,7 @@ func TestRecvMsgBeat(t *testing.T) { for i, tt := range tests { sm := newStateMachine(0, []int64{0, 1, 2}) sm.log = &log{ents: []Entry{{}, {Term: 0}, {Term: 1}}} - sm.term = 1 + sm.term.Set(1) sm.state = tt.state sm.Step(Message{Type: msgBeat})