From 7d862960cc130e7762b2025f456036d7386ec809 Mon Sep 17 00:00:00 2001 From: Adam Wolfe Gordon Date: Mon, 8 Feb 2016 09:58:10 -0700 Subject: [PATCH] contrib/raftexample: Add a channel for proposing config changes Add a channel over which we can propose cluster config changes to raft. In an upcoming commit we'll add an HTTP endpoint that sends config changes over this channel. --- contrib/raftexample/main.go | 6 ++++- contrib/raftexample/raft.go | 36 ++++++++++++++++++++----- contrib/raftexample/raftexample_test.go | 24 ++++++++++------- 3 files changed, 49 insertions(+), 17 deletions(-) diff --git a/contrib/raftexample/main.go b/contrib/raftexample/main.go index c3ed76af7..800d4a648 100644 --- a/contrib/raftexample/main.go +++ b/contrib/raftexample/main.go @@ -17,6 +17,8 @@ package main import ( "flag" "strings" + + "github.com/coreos/etcd/raft/raftpb" ) func main() { @@ -27,9 +29,11 @@ func main() { proposeC := make(chan string) defer close(proposeC) + confChangeC := make(chan raftpb.ConfChange) + defer close(confChangeC) // raft provides a commit stream for the proposals from the http api - commitC, errorC := newRaftNode(*id, strings.Split(*cluster, ","), proposeC) + commitC, errorC := newRaftNode(*id, strings.Split(*cluster, ","), proposeC, confChangeC) // the key-value http handler will propose updates to raft serveHttpKVAPI(*kvport, proposeC, commitC, errorC) diff --git a/contrib/raftexample/raft.go b/contrib/raftexample/raft.go index 5bce9e808..a52234fb3 100644 --- a/contrib/raftexample/raft.go +++ b/contrib/raftexample/raft.go @@ -36,9 +36,10 @@ import ( // A key-value stream backed by raft type raftNode struct { - proposeC <-chan string // proposed messages (k,v) - commitC chan *string // entries committed to log (k,v) - errorC chan error // errors from raft session + proposeC <-chan string // proposed messages (k,v) + confChangeC <-chan raftpb.ConfChange // proposed cluster config changes + commitC chan *string // entries committed to log (k,v) + errorC chan error // errors from raft session id int // client ID for raft session peers []string // raft peer URLs @@ -59,9 +60,12 @@ type raftNode struct { // provided the proposal channel. All log entries are replayed over the // commit channel, followed by a nil message (to indicate the channel is // current), then new log entries. To shutdown, close proposeC and read errorC. -func newRaftNode(id int, peers []string, proposeC <-chan string) (<-chan *string, <-chan error) { +func newRaftNode(id int, peers []string, proposeC <-chan string, + confChangeC <-chan raftpb.ConfChange) (<-chan *string, <-chan error) { + rc := &raftNode{ proposeC: proposeC, + confChangeC: confChangeC, commitC: make(chan *string), errorC: make(chan error), id: id, @@ -232,9 +236,27 @@ func (rc *raftNode) serveChannels() { // send proposals over raft go func() { - for prop := range rc.proposeC { - // blocks until accepted by raft state machine - rc.node.Propose(context.TODO(), []byte(prop)) + var confChangeCount uint64 = 0 + + for rc.proposeC != nil && rc.confChangeC != nil { + select { + case prop, ok := <-rc.proposeC: + if !ok { + rc.proposeC = nil + } else { + // blocks until accepted by raft state machine + rc.node.Propose(context.TODO(), []byte(prop)) + } + + case cc, ok := <-rc.confChangeC: + if !ok { + rc.confChangeC = nil + } else { + confChangeCount += 1 + cc.ID = confChangeCount + rc.node.ProposeConfChange(context.TODO(), cc) + } + } } // client closed channel; shutdown raft if not already close(rc.stopc) diff --git a/contrib/raftexample/raftexample_test.go b/contrib/raftexample/raftexample_test.go index 8ecf78752..4be310c1b 100644 --- a/contrib/raftexample/raftexample_test.go +++ b/contrib/raftexample/raftexample_test.go @@ -18,13 +18,16 @@ import ( "fmt" "os" "testing" + + "github.com/coreos/etcd/raft/raftpb" ) type cluster struct { - peers []string - commitC []<-chan *string - errorC []<-chan error - proposeC []chan string + peers []string + commitC []<-chan *string + errorC []<-chan error + proposeC []chan string + confChangeC []chan raftpb.ConfChange } // newCluster creates a cluster of n nodes @@ -35,15 +38,18 @@ func newCluster(n int) *cluster { } clus := &cluster{ - peers: peers, - commitC: make([]<-chan *string, len(peers)), - errorC: make([]<-chan error, len(peers)), - proposeC: make([]chan string, len(peers))} + peers: peers, + commitC: make([]<-chan *string, len(peers)), + errorC: make([]<-chan error, len(peers)), + proposeC: make([]chan string, len(peers)), + confChangeC: make([]chan raftpb.ConfChange, len(peers)), + } for i := range clus.peers { os.RemoveAll(fmt.Sprintf("raftexample-%d", i+1)) clus.proposeC[i] = make(chan string, 1) - clus.commitC[i], clus.errorC[i] = newRaftNode(i+1, clus.peers, clus.proposeC[i]) + clus.confChangeC[i] = make(chan raftpb.ConfChange, 1) + clus.commitC[i], clus.errorC[i] = newRaftNode(i+1, clus.peers, clus.proposeC[i], clus.confChangeC[i]) } return clus