From eb7fef559d6b6b61176aa1dba22dab0c7538564b Mon Sep 17 00:00:00 2001 From: Adam Wolfe Gordon Date: Fri, 5 Feb 2016 11:21:41 -0700 Subject: [PATCH] contrib/raftexample: Handle conf change entries So far we don't propose conf changes, but we'll be ready to handle them when we do. --- contrib/raftexample/raft.go | 41 +++++++++++++++++++++++++++++-------- 1 file changed, 32 insertions(+), 9 deletions(-) diff --git a/contrib/raftexample/raft.go b/contrib/raftexample/raft.go index 439d7f1d6..5bce9e808 100644 --- a/contrib/raftexample/raft.go +++ b/contrib/raftexample/raft.go @@ -81,15 +81,36 @@ func newRaftNode(id int, peers []string, proposeC <-chan string) (<-chan *string // whether all entries could be published. func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool { for i := range ents { - if ents[i].Type != raftpb.EntryNormal || len(ents[i].Data) == 0 { - // ignore conf changes and empty messages - continue - } - s := string(ents[i].Data) - select { - case rc.commitC <- &s: - case <-rc.stopc: - return false + switch ents[i].Type { + case raftpb.EntryNormal: + if len(ents[i].Data) == 0 { + // ignore conf changes and empty messages + continue + } + s := string(ents[i].Data) + select { + case rc.commitC <- &s: + case <-rc.stopc: + return false + } + + case raftpb.EntryConfChange: + var cc raftpb.ConfChange + cc.Unmarshal(ents[i].Data) + + rc.node.ApplyConfChange(cc) + switch cc.Type { + case raftpb.ConfChangeAddNode: + if len(cc.Context) > 0 { + rc.transport.AddPeer(types.ID(cc.NodeID), []string{string(cc.Context)}) + } + case raftpb.ConfChangeRemoveNode: + if cc.NodeID == uint64(rc.id) { + log.Println("I've been removed from the cluster! Shutting down.") + return false + } + rc.transport.RemovePeer(types.ID(cc.NodeID)) + } } } return true @@ -193,6 +214,8 @@ func (rc *raftNode) stop() { close(rc.commitC) close(rc.errorC) rc.node.Stop() + + os.Exit(0) } func (rc *raftNode) stopHTTP() {