From ff6705b94b7a47e4f6948e9b35b5e21e0123cc92 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Fri, 19 Sep 2014 12:35:56 -0700 Subject: [PATCH] raft: add Configure, AddNode, RemoveNode Configure is used to propose config change. AddNode and RemoveNode is used to apply cluster change to raft state machine. They are the basics for dynamic configuration. --- raft/doc.go | 11 +++++ raft/log.go | 5 ++ raft/node.go | 54 ++++++++++++++++++++- raft/raft.go | 30 ++++++++++++ raft/raft_test.go | 105 +++++++++++++++++++++++++++++++++++++++++ raft/raftpb/raft.pb.go | 20 ++++++++ raft/raftpb/raft.proto | 1 + 7 files changed, 225 insertions(+), 1 deletion(-) diff --git a/raft/doc.go b/raft/doc.go index d9e3d2a3b..53a42f1f7 100644 --- a/raft/doc.go +++ b/raft/doc.go @@ -61,5 +61,16 @@ data, serialize it into a byte slice and call: n.Propose(ctx, data) +To add or remove node in a cluster, serialize the data for configuration change +into a byte slice and call: + + n.Configure(ctx, data) + +For the safety consideration, one configuration should include at most one node +change, which is applied through: + + n.AddNode(id) + n.RemoveNode(id) + */ package raft diff --git a/raft/log.go b/raft/log.go index 562afae9f..b17dba8df 100644 --- a/raft/log.go +++ b/raft/log.go @@ -10,6 +10,11 @@ const ( defaultCompactThreshold = 10000 ) +const ( + EntryNormal int64 = iota + EntryConfig +) + type raftLog struct { ents []pb.Entry unstable int64 diff --git a/raft/node.go b/raft/node.go index d93c1292e..af0e98a0c 100644 --- a/raft/node.go +++ b/raft/node.go @@ -76,10 +76,12 @@ type Node interface { // Tick increments the internal logical clock for the Node by a single tick. Election // timeouts and heartbeat timeouts are in units of ticks. Tick() - // Campaign causes the Node to transition to candidate state and start campaigning to become leader + // Campaign causes the Node to transition to candidate state and start campaigning to become leader. Campaign(ctx context.Context) error // Propose proposes that data be appended to the log. Propose(ctx context.Context, data []byte) error + // Configure proposes config change. Only one config can be in the process of going through consensus at a time. + Configure(ctx context.Context, data []byte) error // Step advances the state machine using the given message. ctx.Err() will be returned, if any. Step(ctx context.Context, msg pb.Message) error // Ready returns a channel that returns the current point-in-time state @@ -88,6 +90,12 @@ type Node interface { Stop() // Compact Compact(d []byte) + // AddNode adds a node with given id into peer list. + // TODO: reject existed node + AddNode(id int64) + // RemoveNode removes a node with give id from peer list. + // TODO: reject unexisted node + RemoveNode(id int64) } // StartNode returns a new Node given a unique raft id, a list of raft peers, and @@ -114,11 +122,22 @@ func RestartNode(id int64, peers []int64, election, heartbeat int, snapshot *pb. return &n } +const ( + confAdd = iota + confRemove +) + +type conf struct { + typ int + id int64 +} + // node is the canonical implementation of the Node interface type node struct { propc chan pb.Message recvc chan pb.Message compactc chan []byte + confc chan conf readyc chan Ready tickc chan struct{} done chan struct{} @@ -129,6 +148,7 @@ func newNode() node { propc: make(chan pb.Message), recvc: make(chan pb.Message), compactc: make(chan []byte), + confc: make(chan conf), readyc: make(chan Ready), tickc: make(chan struct{}), done: make(chan struct{}), @@ -167,6 +187,7 @@ func (n *node) run(r *raft) { } select { + // TODO: buffer the config propose if there exists one case m := <-propc: m.From = r.id r.Step(m) @@ -174,6 +195,15 @@ func (n *node) run(r *raft) { r.Step(m) // raft never returns an error case d := <-n.compactc: r.compact(d) + case c := <-n.confc: + switch c.typ { + case confAdd: + r.addNode(c.id) + case confRemove: + r.removeNode(c.id) + default: + panic("unexpected conf type") + } case <-n.tickc: r.tick() case readyc <- rd: @@ -186,6 +216,10 @@ func (n *node) run(r *raft) { if !IsEmptySnap(rd.Snapshot) { prevSnapi = rd.Snapshot.Index } + // TODO(yichengq): we assume that all committed config + // entries will be applied to make things easy for now. + // TODO(yichengq): it may have race because applied is set + // before entries are applied. r.raftLog.resetNextEnts() r.raftLog.resetUnstable() r.msgs = nil @@ -212,6 +246,10 @@ func (n *node) Propose(ctx context.Context, data []byte) error { return n.Step(ctx, pb.Message{Type: msgProp, Entries: []pb.Entry{{Data: data}}}) } +func (n *node) Configure(ctx context.Context, data []byte) error { + return n.Step(ctx, pb.Message{Type: msgProp, Entries: []pb.Entry{{Type: EntryConfig, Data: data}}}) +} + // Step advances the state machine using msgs. The ctx.Err() will be returned, // if any. func (n *node) Step(ctx context.Context, m pb.Message) error { @@ -241,6 +279,20 @@ func (n *node) Compact(d []byte) { } } +func (n *node) AddNode(id int64) { + select { + case n.confc <- conf{typ: confAdd, id: id}: + case <-n.done: + } +} + +func (n *node) RemoveNode(id int64) { + select { + case n.confc <- conf{typ: confRemove, id: id}: + case <-n.done: + } +} + func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState, prevSnapi int64) Ready { rd := Ready{ Entries: r.raftLog.unstableEnts(), diff --git a/raft/raft.go b/raft/raft.go index 1097bb36f..988a79ace 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -105,6 +105,10 @@ type raft struct { // the leader id lead int64 + // pending configuration + // New configuration is ignored if there exists configuration unapplied. + pendingConf bool + elapsed int // number of ticks since the last msg heartbeatTimeout int electionTimeout int @@ -245,6 +249,7 @@ func (r *raft) reset(term int64) { r.prs[i].match = r.raftLog.lastIndex() } } + r.pendingConf = false } func (r *raft) q() int { @@ -308,6 +313,15 @@ func (r *raft) becomeLeader() { r.tick = r.tickHeartbeat r.lead = r.id r.state = StateLeader + for _, e := range r.raftLog.entries(r.raftLog.committed + 1) { + if e.Type != EntryConfig { + continue + } + if r.pendingConf { + panic("unexpected double uncommitted config entry") + } + r.pendingConf = true + } r.appendEntry(pb.Entry{Data: nil}) } @@ -373,6 +387,16 @@ func (r *raft) handleSnapshot(m pb.Message) { } } +func (r *raft) addNode(id int64) { + r.setProgress(id, 0, r.raftLog.lastIndex()+1) + r.pendingConf = false +} + +func (r *raft) removeNode(id int64) { + r.delProgress(id) + r.pendingConf = false +} + type stepFunc func(r *raft, m pb.Message) func stepLeader(r *raft, m pb.Message) { @@ -384,6 +408,12 @@ func stepLeader(r *raft, m pb.Message) { panic("unexpected length(entries) of a msgProp") } e := m.Entries[0] + if e.Type == EntryConfig { + if r.pendingConf { + return + } + r.pendingConf = true + } r.appendEntry(e) r.bcastAppend() case msgAppResp: diff --git a/raft/raft_test.go b/raft/raft_test.go index 24c15e622..37274f683 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -948,6 +948,111 @@ func TestSlowNodeRestore(t *testing.T) { } } +// TestStepConfig tests that when raft step msgProp in ConfigEntry type, +// it appends the entry to log and sets pendingConf to be true. +func TestStepConfig(t *testing.T) { + // a raft that cannot make progress + r := newRaft(1, []int64{1, 2}, 0, 0) + r.becomeCandidate() + r.becomeLeader() + index := r.raftLog.lastIndex() + r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: EntryConfig}}}) + if g := r.raftLog.lastIndex(); g != index+1 { + t.Errorf("index = %d, want %d", g, index+1) + } + if r.pendingConf != true { + t.Errorf("pendingConf = %v, want true", r.pendingConf) + } +} + +// TestStepIgnoreConfig tests that if raft step the second msgProp in +// ConfigEntry type when the first one is uncommitted, the node will deny +// the proposal and keep its original state. +func TestStepIgnoreConfig(t *testing.T) { + // a raft that cannot make progress + r := newRaft(1, []int64{1, 2}, 0, 0) + r.becomeCandidate() + r.becomeLeader() + r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: EntryConfig}}}) + index := r.raftLog.lastIndex() + pendingConf := r.pendingConf + r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: EntryConfig}}}) + if g := r.raftLog.lastIndex(); g != index { + t.Errorf("index = %d, want %d", g, index) + } + if r.pendingConf != pendingConf { + t.Errorf("pendingConf = %v, want %v", r.pendingConf, pendingConf) + } +} + +// TestRecoverPendingConfig tests that new leader recovers its pendingConf flag +// based on uncommitted entries. +func TestRecoverPendingConfig(t *testing.T) { + tests := []struct { + entType int64 + wpending bool + }{ + {EntryNormal, false}, + {EntryConfig, true}, + } + for i, tt := range tests { + r := newRaft(1, []int64{1, 2}, 0, 0) + r.appendEntry(pb.Entry{Type: tt.entType}) + r.becomeCandidate() + r.becomeLeader() + if r.pendingConf != tt.wpending { + t.Errorf("#%d: pendingConf = %v, want %v", i, r.pendingConf, tt.wpending) + } + } +} + +// TestRecoverDoublePendingConfig tests that new leader will panic if +// there exist two uncommitted config entries. +func TestRecoverDoublePendingConfig(t *testing.T) { + func() { + defer func() { + if err := recover(); err == nil { + t.Errorf("expect panic, but nothing happens") + } + }() + r := newRaft(1, []int64{1, 2}, 0, 0) + r.appendEntry(pb.Entry{Type: EntryConfig}) + r.appendEntry(pb.Entry{Type: EntryConfig}) + r.becomeCandidate() + r.becomeLeader() + }() +} + +// TestAddNode tests that addNode could update pendingConf and peer list correctly. +func TestAddNode(t *testing.T) { + r := newRaft(1, []int64{1}, 0, 0) + r.pendingConf = true + r.addNode(2) + if r.pendingConf != false { + t.Errorf("pendingConf = %v, want false", r.pendingConf) + } + nodes := r.nodes() + sort.Sort(int64Slice(nodes)) + wnodes := []int64{1, 2} + if !reflect.DeepEqual(nodes, wnodes) { + t.Errorf("nodes = %v, want %v", nodes, wnodes) + } +} + +// TestRemoveNode tests that removeNode could update pendingConf and peer list correctly. +func TestRemoveNode(t *testing.T) { + r := newRaft(1, []int64{1, 2}, 0, 0) + r.pendingConf = true + r.removeNode(2) + if r.pendingConf != false { + t.Errorf("pendingConf = %v, want false", r.pendingConf) + } + w := []int64{1} + if g := r.nodes(); !reflect.DeepEqual(g, w) { + t.Errorf("nodes = %v, want %v", g, w) + } +} + func ents(terms ...int64) *raft { ents := []pb.Entry{{}} for _, term := range terms { diff --git a/raft/raftpb/raft.pb.go b/raft/raftpb/raft.pb.go index 148c302e2..1f5808f21 100644 --- a/raft/raftpb/raft.pb.go +++ b/raft/raftpb/raft.pb.go @@ -41,6 +41,7 @@ func (m *Info) String() string { return proto.CompactTextString(m) } func (*Info) ProtoMessage() {} type Entry struct { + Type int64 `protobuf:"varint,1,req,name=type" json:"type"` Term int64 `protobuf:"varint,2,req,name=term" json:"term"` Index int64 `protobuf:"varint,3,req,name=index" json:"index"` Data []byte `protobuf:"bytes,4,opt,name=data" json:"data"` @@ -169,6 +170,21 @@ func (m *Entry) Unmarshal(data []byte) error { fieldNum := int32(wire >> 3) wireType := int(wire & 0x7) switch fieldNum { + case 1: + if wireType != 0 { + return code_google_com_p_gogoprotobuf_proto.ErrWrongType + } + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + m.Type |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } case 2: if wireType != 0 { return code_google_com_p_gogoprotobuf_proto.ErrWrongType @@ -648,6 +664,7 @@ func (m *Info) Size() (n int) { func (m *Entry) Size() (n int) { var l int _ = l + n += 1 + sovRaft(uint64(m.Type)) n += 1 + sovRaft(uint64(m.Term)) n += 1 + sovRaft(uint64(m.Index)) l = len(m.Data) @@ -760,6 +777,9 @@ func (m *Entry) MarshalTo(data []byte) (n int, err error) { _ = i var l int _ = l + data[i] = 0x8 + i++ + i = encodeVarintRaft(data, i, uint64(m.Type)) data[i] = 0x10 i++ i = encodeVarintRaft(data, i, uint64(m.Term)) diff --git a/raft/raftpb/raft.proto b/raft/raftpb/raft.proto index d6bf94ab0..f5249c1a5 100644 --- a/raft/raftpb/raft.proto +++ b/raft/raftpb/raft.proto @@ -12,6 +12,7 @@ message Info { } message Entry { + required int64 type = 1 [(gogoproto.nullable) = false]; required int64 term = 2 [(gogoproto.nullable) = false]; required int64 index = 3 [(gogoproto.nullable) = false]; optional bytes data = 4 [(gogoproto.nullable) = false];