From b7cf2385e6874d07f32f183007e50f4448b7e3a2 Mon Sep 17 00:00:00 2001 From: chz Date: Sat, 19 Dec 2015 20:42:44 -0800 Subject: [PATCH] contrib/raftexample: add test, fix dead lock on proposal channel deadlock if no leader; node selects on propc=nil and writes to Ready, client blocks on propC in same select as Ready reader, and so progress of raft state machine deadlocks. --- contrib/raftexample/raft.go | 26 +++--- contrib/raftexample/raftexample_test.go | 107 ++++++++++++++++++++++++ 2 files changed, 123 insertions(+), 10 deletions(-) create mode 100644 contrib/raftexample/raftexample_test.go diff --git a/contrib/raftexample/raft.go b/contrib/raftexample/raft.go index 555f2f558..753cc2a20 100644 --- a/contrib/raftexample/raft.go +++ b/contrib/raftexample/raft.go @@ -184,21 +184,23 @@ func (rc *raftNode) serveChannels() { ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() - // event loop on client proposals and raft updates + // send proposals over raft + stopc := make(chan struct{}, 1) + go func() { + for prop := range rc.proposeC { + // blocks until accepted by raft state machine + rc.node.Propose(context.TODO(), []byte(prop)) + } + // client closed channel; shutdown raft if not already + stopc <- struct{}{} + }() + + // event loop on raft state machine updates for { select { case <-ticker.C: rc.node.Tick() - // send proposals over raft - case prop, ok := <-rc.proposeC: - if !ok { - // client closed channel; shut down - rc.stop() - return - } - rc.node.Propose(context.TODO(), []byte(prop)) - // store raft entries to wal, then publish over commit channel case rd := <-rc.node.Ready(): rc.wal.Save(rd.HardState, rd.Entries) @@ -210,6 +212,10 @@ func (rc *raftNode) serveChannels() { case err := <-rc.transport.ErrorC: rc.writeError(err) return + + case <-stopc: + rc.stop() + return } } } diff --git a/contrib/raftexample/raftexample_test.go b/contrib/raftexample/raftexample_test.go new file mode 100644 index 000000000..cd6188d60 --- /dev/null +++ b/contrib/raftexample/raftexample_test.go @@ -0,0 +1,107 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "fmt" + "os" + "testing" +) + +type cluster struct { + peers []string + commitC []<-chan *string + errorC []<-chan error + proposeC []chan string +} + +// newCluster creates a cluster of n nodes +func newCluster(n int) *cluster { + peers := make([]string, n) + for i := range peers { + peers[i] = fmt.Sprintf("http://127.0.0.1:%d", 10000+i) + } + + clus := &cluster{ + peers: peers, + commitC: make([]<-chan *string, len(peers)), + errorC: make([]<-chan error, len(peers)), + proposeC: make([]chan string, len(peers))} + + for i := range clus.peers { + os.RemoveAll(fmt.Sprintf("raftexample-%d", i+1)) + clus.proposeC[i] = make(chan string, 1) + clus.commitC[i], clus.errorC[i] = newRaftNode(i+1, clus.peers, clus.proposeC[i]) + // replay local log + for s := range clus.commitC[i] { + if s == nil { + break + } + } + } + + return clus +} + +// Close closes all cluster nodes and returns an error if any failed. +func (clus *cluster) Close() (err error) { + for i := range clus.peers { + close(clus.proposeC[i]) + for range clus.commitC[i] { + // drain pending commits + } + // wait for channel to close + if erri, _ := <-clus.errorC[i]; erri != nil { + err = erri + } + // clean intermediates + os.RemoveAll(fmt.Sprintf("raftexample-%d", i+1)) + } + return err +} + +// TestProposeOnCommit starts three nodes and feeds commits back into the proposal +// channel. The intent is to ensure blocking on a proposal won't block raft progress. +func TestProposeOnCommit(t *testing.T) { + clus := newCluster(3) + defer func() { + if err := clus.Close(); err != nil { + t.Fatal(err) + } + }() + + donec := make(chan struct{}) + for i := range clus.peers { + // feedback for "n" committed entries, then update donec + go func(pC chan<- string, cC <-chan *string, eC <-chan error) { + for n := 0; n < 100; n++ { + select { + case s := <-cC: + pC <- *s + case err, _ := <-eC: + t.Fatalf("eC closed (%v)", err) + } + } + donec <- struct{}{} + }(clus.proposeC[i], clus.commitC[i], clus.errorC[i]) + + // one message feedback per node + go func() { clus.proposeC[i] <- "foo" }() + } + + for range clus.peers { + <-donec + } +}