diff --git a/raft/log.go b/raft/log.go index 3c5266629..d69536a7c 100644 --- a/raft/log.go +++ b/raft/log.go @@ -1,6 +1,12 @@ package raft +const ( + normal int = iota + config +) + type Entry struct { + Type int Term int Data []byte } diff --git a/raft/node.go b/raft/node.go index 674ca1c55..f9e62ca6d 100644 --- a/raft/node.go +++ b/raft/node.go @@ -33,7 +33,7 @@ func New(addr int, peer []int, heartbeat, election tick) *Node { // Propose asynchronously proposes data be applied to the underlying state machine. func (n *Node) Propose(data []byte) { - m := Message{Type: msgProp, Data: data} + m := Message{Type: msgProp, Entries: []Entry{{Data: data}}} n.Step(m) } diff --git a/raft/raft.go b/raft/raft.go index 30651c4bc..4278eca5b 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -63,7 +63,6 @@ type Message struct { PrevTerm int Entries []Entry Commit int - Data []byte } type index struct { @@ -103,6 +102,9 @@ type stateMachine struct { // the leader addr lead int + + // pending reconfiguration + pendingConf bool } func newStateMachine(addr int, peer []int) *stateMachine { @@ -254,9 +256,23 @@ func (sm *stateMachine) Step(m Message) { sm.bcastAppend() return case msgProp: + if len(m.Entries) != 1 { + panic("unexpected length(entries) of a msgProp") + } + switch sm.lead { case sm.addr: - sm.log.append(sm.log.lastIndex(), Entry{Term: sm.term, Data: m.Data}) + e := m.Entries[0] + if e.Type == config { + if sm.pendingConf { + // todo: deny + return + } + sm.pendingConf = true + } + e.Term = sm.term + + sm.log.append(sm.log.lastIndex(), e) sm.ins[sm.addr].update(sm.log.lastIndex()) sm.maybeCommit() sm.bcastAppend() diff --git a/raft/raft_test.go b/raft/raft_test.go index 2e3b2dc78..a77f69fd2 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -45,16 +45,17 @@ func TestLogReplication(t *testing.T) { { newNetwork(nil, nil, nil), []Message{ - {To: 0, Type: msgProp, Data: []byte("somedata")}, + {To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("somedata")}}}, }, 1, }, { newNetwork(nil, nil, nil), []Message{ - {To: 0, Type: msgProp, Data: []byte("somedata")}, + + {To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("somedata")}}}, {To: 1, Type: msgHup}, - {To: 1, Type: msgProp, Data: []byte("somedata")}, + {To: 1, Type: msgProp, Entries: []Entry{{Data: []byte("somedata")}}}, }, 2, }, @@ -82,8 +83,8 @@ func TestLogReplication(t *testing.T) { } } for k, m := range props { - if !bytes.Equal(ents[k].Data, m.Data) { - t.Errorf("#%d.%d: data = %d, want %d", i, j, ents[k].Data, m.Data) + if !bytes.Equal(ents[k].Data, m.Entries[0].Data) { + t.Errorf("#%d.%d: data = %d, want %d", i, j, ents[k].Data, m.Entries[0].Data) } } } @@ -93,8 +94,8 @@ func TestLogReplication(t *testing.T) { func TestSingleNodeCommit(t *testing.T) { tt := newNetwork(nil) tt.send(Message{To: 0, Type: msgHup}) - tt.send(Message{To: 0, Type: msgProp, Data: []byte("some data")}) - tt.send(Message{To: 0, Type: msgProp, Data: []byte("some data")}) + tt.send(Message{To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}}) + tt.send(Message{To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}}) sm := tt.peers[0].(*stateMachine) if sm.log.committed != 2 { @@ -111,8 +112,8 @@ func TestCannotCommitWithoutNewTermEntry(t *testing.T) { tt.cut(0, 3) tt.cut(0, 4) - tt.send(Message{To: 0, Type: msgProp, Data: []byte("some data")}) - tt.send(Message{To: 0, Type: msgProp, Data: []byte("some data")}) + tt.send(Message{To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}}) + tt.send(Message{To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}}) sm := tt.peers[0].(*stateMachine) if sm.log.committed != 0 { @@ -135,7 +136,7 @@ func TestCannotCommitWithoutNewTermEntry(t *testing.T) { // after append a entry from the current term, all entries // should be committed - tt.send(Message{To: 1, Type: msgProp, Data: []byte("some data")}) + tt.send(Message{To: 1, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}}) if sm.log.committed != 3 { t.Errorf("committed = %d, want %d", sm.log.committed, 3) } @@ -197,7 +198,7 @@ func TestCandidateConcede(t *testing.T) { data := []byte("force follower") // send a proposal to 2 to flush out a msgApp to 0 - tt.send(Message{To: 2, Type: msgProp, Data: data}) + tt.send(Message{To: 2, Type: msgProp, Entries: []Entry{{Data: data}}}) a := tt.peers[0].(*stateMachine) if g := a.state; g != stateFollower { @@ -284,7 +285,7 @@ func TestProposal(t *testing.T) { // promote 0 the leader send(Message{To: 0, Type: msgHup}) - send(Message{To: 0, Type: msgProp, Data: data}) + send(Message{To: 0, Type: msgProp, Entries: []Entry{{Data: data}}}) wantLog := newLog() if tt.success { @@ -320,7 +321,7 @@ func TestProposalByProxy(t *testing.T) { tt.send(Message{To: 0, Type: msgHup}) // propose via follower - tt.send(Message{To: 1, Type: msgProp, Data: []byte("somedata")}) + tt.send(Message{To: 1, Type: msgProp, Entries: []Entry{{Data: []byte("somedata")}}}) wantLog := &log{ents: []Entry{{}, {Term: 1, Data: data}}, committed: 1} base := ltoa(wantLog) @@ -491,6 +492,29 @@ func TestStateTransition(t *testing.T) { } } +func TestConf(t *testing.T) { + sm := newStateMachine(0, []int{0}) + sm.becomeCandidate() + sm.becomeLeader() + + sm.Step(Message{Type: msgProp, Entries: []Entry{{Type: config}}}) + if sm.log.lastIndex() != 1 { + t.Errorf("lastindex = %d, want %d", sm.log.lastIndex(), 1) + } + if !sm.pendingConf { + t.Errorf("pendingConf = %v, want %v", sm.pendingConf, true) + } + if sm.log.ents[1].Type != config { + t.Errorf("type = %d, want %d", sm.log.ents[1].Type, config) + } + + // deny the second configuration change request if there is a pending one + sm.Step(Message{Type: msgProp, Entries: []Entry{{Type: config}}}) + if sm.log.lastIndex() != 1 { + t.Errorf("lastindex = %d, want %d", sm.log.lastIndex(), 1) + } +} + func TestAllServerStepdown(t *testing.T) { tests := []stateType{stateFollower, stateCandidate, stateLeader}