From c03fbf68d6a3ad6b754751cfd9dff27ceb0f3d4b Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Thu, 5 Jun 2014 13:00:18 -0700 Subject: [PATCH] raft: add conf safety To make configuration change safe without adding configuration protocol: 1. We only allow to add/remove one node at a time. 2. We only allow one uncommitted configuration entry in the log. These two rules can make sure there is no disjoint quorums in both current cluster and the future(after applied any number of committed entries or uncommitted entries in log) clusters. We add a type field in Entry structure for two reasons: 1. Statemachine needs to know if there is a pending configuration change. 2. Configuration entry should be executed by raft package rather application who is using raft. --- raft/log.go | 6 ++++++ raft/node.go | 2 +- raft/raft.go | 20 +++++++++++++++++-- raft/raft_test.go | 50 +++++++++++++++++++++++++++++++++++------------ 4 files changed, 62 insertions(+), 16 deletions(-) diff --git a/raft/log.go b/raft/log.go index 3c5266629..d69536a7c 100644 --- a/raft/log.go +++ b/raft/log.go @@ -1,6 +1,12 @@ package raft +const ( + normal int = iota + config +) + type Entry struct { + Type int Term int Data []byte } diff --git a/raft/node.go b/raft/node.go index 674ca1c55..f9e62ca6d 100644 --- a/raft/node.go +++ b/raft/node.go @@ -33,7 +33,7 @@ func New(addr int, peer []int, heartbeat, election tick) *Node { // Propose asynchronously proposes data be applied to the underlying state machine. func (n *Node) Propose(data []byte) { - m := Message{Type: msgProp, Data: data} + m := Message{Type: msgProp, Entries: []Entry{{Data: data}}} n.Step(m) } diff --git a/raft/raft.go b/raft/raft.go index 30651c4bc..4278eca5b 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -63,7 +63,6 @@ type Message struct { PrevTerm int Entries []Entry Commit int - Data []byte } type index struct { @@ -103,6 +102,9 @@ type stateMachine struct { // the leader addr lead int + + // pending reconfiguration + pendingConf bool } func newStateMachine(addr int, peer []int) *stateMachine { @@ -254,9 +256,23 @@ func (sm *stateMachine) Step(m Message) { sm.bcastAppend() return case msgProp: + if len(m.Entries) != 1 { + panic("unexpected length(entries) of a msgProp") + } + switch sm.lead { case sm.addr: - sm.log.append(sm.log.lastIndex(), Entry{Term: sm.term, Data: m.Data}) + e := m.Entries[0] + if e.Type == config { + if sm.pendingConf { + // todo: deny + return + } + sm.pendingConf = true + } + e.Term = sm.term + + sm.log.append(sm.log.lastIndex(), e) sm.ins[sm.addr].update(sm.log.lastIndex()) sm.maybeCommit() sm.bcastAppend() diff --git a/raft/raft_test.go b/raft/raft_test.go index 2e3b2dc78..a77f69fd2 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -45,16 +45,17 @@ func TestLogReplication(t *testing.T) { { newNetwork(nil, nil, nil), []Message{ - {To: 0, Type: msgProp, Data: []byte("somedata")}, + {To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("somedata")}}}, }, 1, }, { newNetwork(nil, nil, nil), []Message{ - {To: 0, Type: msgProp, Data: []byte("somedata")}, + + {To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("somedata")}}}, {To: 1, Type: msgHup}, - {To: 1, Type: msgProp, Data: []byte("somedata")}, + {To: 1, Type: msgProp, Entries: []Entry{{Data: []byte("somedata")}}}, }, 2, }, @@ -82,8 +83,8 @@ func TestLogReplication(t *testing.T) { } } for k, m := range props { - if !bytes.Equal(ents[k].Data, m.Data) { - t.Errorf("#%d.%d: data = %d, want %d", i, j, ents[k].Data, m.Data) + if !bytes.Equal(ents[k].Data, m.Entries[0].Data) { + t.Errorf("#%d.%d: data = %d, want %d", i, j, ents[k].Data, m.Entries[0].Data) } } } @@ -93,8 +94,8 @@ func TestLogReplication(t *testing.T) { func TestSingleNodeCommit(t *testing.T) { tt := newNetwork(nil) tt.send(Message{To: 0, Type: msgHup}) - tt.send(Message{To: 0, Type: msgProp, Data: []byte("some data")}) - tt.send(Message{To: 0, Type: msgProp, Data: []byte("some data")}) + tt.send(Message{To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}}) + tt.send(Message{To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}}) sm := tt.peers[0].(*stateMachine) if sm.log.committed != 2 { @@ -111,8 +112,8 @@ func TestCannotCommitWithoutNewTermEntry(t *testing.T) { tt.cut(0, 3) tt.cut(0, 4) - tt.send(Message{To: 0, Type: msgProp, Data: []byte("some data")}) - tt.send(Message{To: 0, Type: msgProp, Data: []byte("some data")}) + tt.send(Message{To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}}) + tt.send(Message{To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}}) sm := tt.peers[0].(*stateMachine) if sm.log.committed != 0 { @@ -135,7 +136,7 @@ func TestCannotCommitWithoutNewTermEntry(t *testing.T) { // after append a entry from the current term, all entries // should be committed - tt.send(Message{To: 1, Type: msgProp, Data: []byte("some data")}) + tt.send(Message{To: 1, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}}) if sm.log.committed != 3 { t.Errorf("committed = %d, want %d", sm.log.committed, 3) } @@ -197,7 +198,7 @@ func TestCandidateConcede(t *testing.T) { data := []byte("force follower") // send a proposal to 2 to flush out a msgApp to 0 - tt.send(Message{To: 2, Type: msgProp, Data: data}) + tt.send(Message{To: 2, Type: msgProp, Entries: []Entry{{Data: data}}}) a := tt.peers[0].(*stateMachine) if g := a.state; g != stateFollower { @@ -284,7 +285,7 @@ func TestProposal(t *testing.T) { // promote 0 the leader send(Message{To: 0, Type: msgHup}) - send(Message{To: 0, Type: msgProp, Data: data}) + send(Message{To: 0, Type: msgProp, Entries: []Entry{{Data: data}}}) wantLog := newLog() if tt.success { @@ -320,7 +321,7 @@ func TestProposalByProxy(t *testing.T) { tt.send(Message{To: 0, Type: msgHup}) // propose via follower - tt.send(Message{To: 1, Type: msgProp, Data: []byte("somedata")}) + tt.send(Message{To: 1, Type: msgProp, Entries: []Entry{{Data: []byte("somedata")}}}) wantLog := &log{ents: []Entry{{}, {Term: 1, Data: data}}, committed: 1} base := ltoa(wantLog) @@ -491,6 +492,29 @@ func TestStateTransition(t *testing.T) { } } +func TestConf(t *testing.T) { + sm := newStateMachine(0, []int{0}) + sm.becomeCandidate() + sm.becomeLeader() + + sm.Step(Message{Type: msgProp, Entries: []Entry{{Type: config}}}) + if sm.log.lastIndex() != 1 { + t.Errorf("lastindex = %d, want %d", sm.log.lastIndex(), 1) + } + if !sm.pendingConf { + t.Errorf("pendingConf = %v, want %v", sm.pendingConf, true) + } + if sm.log.ents[1].Type != config { + t.Errorf("type = %d, want %d", sm.log.ents[1].Type, config) + } + + // deny the second configuration change request if there is a pending one + sm.Step(Message{Type: msgProp, Entries: []Entry{{Type: config}}}) + if sm.log.lastIndex() != 1 { + t.Errorf("lastindex = %d, want %d", sm.log.lastIndex(), 1) + } +} + func TestAllServerStepdown(t *testing.T) { tests := []stateType{stateFollower, stateCandidate, stateLeader}