From b73a11ff45a96c3515811e33f2a33f68bfc4a8e6 Mon Sep 17 00:00:00 2001 From: chz Date: Sun, 20 Dec 2015 11:01:41 -0800 Subject: [PATCH] contrib/raftexample: follow pipeline guidelines closer close raft commit channel before issuing raft error since it's done sending --- contrib/raftexample/kvstore.go | 33 +++++++++++-------------- contrib/raftexample/raft.go | 10 +++----- contrib/raftexample/raftexample_test.go | 10 +++++--- 3 files changed, 26 insertions(+), 27 deletions(-) diff --git a/contrib/raftexample/kvstore.go b/contrib/raftexample/kvstore.go index 1f1760dbd..a7dc2a8ec 100644 --- a/contrib/raftexample/kvstore.go +++ b/contrib/raftexample/kvstore.go @@ -58,25 +58,22 @@ func (s *kvstore) Propose(k string, v string) { } func (s *kvstore) readCommits(commitC <-chan *string, errorC <-chan error) { - for { - select { - case data := <-commitC: - if data == nil { - // done replaying log; new data incoming - return - } - - var data_kv kv - dec := gob.NewDecoder(bytes.NewBufferString(*data)) - if err := dec.Decode(&data_kv); err != nil { - log.Fatalf("raftexample: could not decode message (%v)", err) - } - s.mu.Lock() - s.kvStore[data_kv.Key] = data_kv.Val - s.mu.Unlock() - case err := <-errorC: - log.Println(err) + for data := range commitC { + if data == nil { + // done replaying log; new data incoming return } + + var data_kv kv + dec := gob.NewDecoder(bytes.NewBufferString(*data)) + if err := dec.Decode(&data_kv); err != nil { + log.Fatalf("raftexample: could not decode message (%v)", err) + } + s.mu.Lock() + s.kvStore[data_kv.Key] = data_kv.Val + s.mu.Unlock() + } + if err, ok := <-errorC; ok { + log.Fatal(err) } } diff --git a/contrib/raftexample/raft.go b/contrib/raftexample/raft.go index 753cc2a20..150fd74aa 100644 --- a/contrib/raftexample/raft.go +++ b/contrib/raftexample/raft.go @@ -122,12 +122,8 @@ func (rc *raftNode) replayWAL() *wal.WAL { } func (rc *raftNode) writeError(err error) { - rc.errorC <- err - rc.stop() -} - -func (rc *raftNode) stop() { close(rc.commitC) + rc.errorC <- err close(rc.errorC) rc.node.Stop() } @@ -214,7 +210,9 @@ func (rc *raftNode) serveChannels() { return case <-stopc: - rc.stop() + close(rc.commitC) + close(rc.errorC) + rc.node.Stop() return } } diff --git a/contrib/raftexample/raftexample_test.go b/contrib/raftexample/raftexample_test.go index cd6188d60..7a70937bf 100644 --- a/contrib/raftexample/raftexample_test.go +++ b/contrib/raftexample/raftexample_test.go @@ -87,11 +87,15 @@ func TestProposeOnCommit(t *testing.T) { // feedback for "n" committed entries, then update donec go func(pC chan<- string, cC <-chan *string, eC <-chan error) { for n := 0; n < 100; n++ { + s, ok := <-cC + if !ok { + pC = nil + } select { - case s := <-cC: - pC <- *s + case pC <- *s: + continue case err, _ := <-eC: - t.Fatalf("eC closed (%v)", err) + t.Fatalf("eC message (%v)", err) } } donec <- struct{}{}