From b7cf2385e6874d07f32f183007e50f4448b7e3a2 Mon Sep 17 00:00:00 2001 From: chz Date: Sat, 19 Dec 2015 20:42:44 -0800 Subject: [PATCH 1/3] contrib/raftexample: add test, fix dead lock on proposal channel deadlock if no leader; node selects on propc=nil and writes to Ready, client blocks on propC in same select as Ready reader, and so progress of raft state machine deadlocks. --- contrib/raftexample/raft.go | 26 +++--- contrib/raftexample/raftexample_test.go | 107 ++++++++++++++++++++++++ 2 files changed, 123 insertions(+), 10 deletions(-) create mode 100644 contrib/raftexample/raftexample_test.go diff --git a/contrib/raftexample/raft.go b/contrib/raftexample/raft.go index 555f2f558..753cc2a20 100644 --- a/contrib/raftexample/raft.go +++ b/contrib/raftexample/raft.go @@ -184,21 +184,23 @@ func (rc *raftNode) serveChannels() { ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() - // event loop on client proposals and raft updates + // send proposals over raft + stopc := make(chan struct{}, 1) + go func() { + for prop := range rc.proposeC { + // blocks until accepted by raft state machine + rc.node.Propose(context.TODO(), []byte(prop)) + } + // client closed channel; shutdown raft if not already + stopc <- struct{}{} + }() + + // event loop on raft state machine updates for { select { case <-ticker.C: rc.node.Tick() - // send proposals over raft - case prop, ok := <-rc.proposeC: - if !ok { - // client closed channel; shut down - rc.stop() - return - } - rc.node.Propose(context.TODO(), []byte(prop)) - // store raft entries to wal, then publish over commit channel case rd := <-rc.node.Ready(): rc.wal.Save(rd.HardState, rd.Entries) @@ -210,6 +212,10 @@ func (rc *raftNode) serveChannels() { case err := <-rc.transport.ErrorC: rc.writeError(err) return + + case <-stopc: + rc.stop() + return } } } diff --git a/contrib/raftexample/raftexample_test.go b/contrib/raftexample/raftexample_test.go new file mode 100644 index 000000000..cd6188d60 --- /dev/null +++ b/contrib/raftexample/raftexample_test.go @@ -0,0 +1,107 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "fmt" + "os" + "testing" +) + +type cluster struct { + peers []string + commitC []<-chan *string + errorC []<-chan error + proposeC []chan string +} + +// newCluster creates a cluster of n nodes +func newCluster(n int) *cluster { + peers := make([]string, n) + for i := range peers { + peers[i] = fmt.Sprintf("http://127.0.0.1:%d", 10000+i) + } + + clus := &cluster{ + peers: peers, + commitC: make([]<-chan *string, len(peers)), + errorC: make([]<-chan error, len(peers)), + proposeC: make([]chan string, len(peers))} + + for i := range clus.peers { + os.RemoveAll(fmt.Sprintf("raftexample-%d", i+1)) + clus.proposeC[i] = make(chan string, 1) + clus.commitC[i], clus.errorC[i] = newRaftNode(i+1, clus.peers, clus.proposeC[i]) + // replay local log + for s := range clus.commitC[i] { + if s == nil { + break + } + } + } + + return clus +} + +// Close closes all cluster nodes and returns an error if any failed. +func (clus *cluster) Close() (err error) { + for i := range clus.peers { + close(clus.proposeC[i]) + for range clus.commitC[i] { + // drain pending commits + } + // wait for channel to close + if erri, _ := <-clus.errorC[i]; erri != nil { + err = erri + } + // clean intermediates + os.RemoveAll(fmt.Sprintf("raftexample-%d", i+1)) + } + return err +} + +// TestProposeOnCommit starts three nodes and feeds commits back into the proposal +// channel. The intent is to ensure blocking on a proposal won't block raft progress. +func TestProposeOnCommit(t *testing.T) { + clus := newCluster(3) + defer func() { + if err := clus.Close(); err != nil { + t.Fatal(err) + } + }() + + donec := make(chan struct{}) + for i := range clus.peers { + // feedback for "n" committed entries, then update donec + go func(pC chan<- string, cC <-chan *string, eC <-chan error) { + for n := 0; n < 100; n++ { + select { + case s := <-cC: + pC <- *s + case err, _ := <-eC: + t.Fatalf("eC closed (%v)", err) + } + } + donec <- struct{}{} + }(clus.proposeC[i], clus.commitC[i], clus.errorC[i]) + + // one message feedback per node + go func() { clus.proposeC[i] <- "foo" }() + } + + for range clus.peers { + <-donec + } +} From b73a11ff45a96c3515811e33f2a33f68bfc4a8e6 Mon Sep 17 00:00:00 2001 From: chz Date: Sun, 20 Dec 2015 11:01:41 -0800 Subject: [PATCH 2/3] contrib/raftexample: follow pipeline guidelines closer close raft commit channel before issuing raft error since it's done sending --- contrib/raftexample/kvstore.go | 33 +++++++++++-------------- contrib/raftexample/raft.go | 10 +++----- contrib/raftexample/raftexample_test.go | 10 +++++--- 3 files changed, 26 insertions(+), 27 deletions(-) diff --git a/contrib/raftexample/kvstore.go b/contrib/raftexample/kvstore.go index 1f1760dbd..a7dc2a8ec 100644 --- a/contrib/raftexample/kvstore.go +++ b/contrib/raftexample/kvstore.go @@ -58,25 +58,22 @@ func (s *kvstore) Propose(k string, v string) { } func (s *kvstore) readCommits(commitC <-chan *string, errorC <-chan error) { - for { - select { - case data := <-commitC: - if data == nil { - // done replaying log; new data incoming - return - } - - var data_kv kv - dec := gob.NewDecoder(bytes.NewBufferString(*data)) - if err := dec.Decode(&data_kv); err != nil { - log.Fatalf("raftexample: could not decode message (%v)", err) - } - s.mu.Lock() - s.kvStore[data_kv.Key] = data_kv.Val - s.mu.Unlock() - case err := <-errorC: - log.Println(err) + for data := range commitC { + if data == nil { + // done replaying log; new data incoming return } + + var data_kv kv + dec := gob.NewDecoder(bytes.NewBufferString(*data)) + if err := dec.Decode(&data_kv); err != nil { + log.Fatalf("raftexample: could not decode message (%v)", err) + } + s.mu.Lock() + s.kvStore[data_kv.Key] = data_kv.Val + s.mu.Unlock() + } + if err, ok := <-errorC; ok { + log.Fatal(err) } } diff --git a/contrib/raftexample/raft.go b/contrib/raftexample/raft.go index 753cc2a20..150fd74aa 100644 --- a/contrib/raftexample/raft.go +++ b/contrib/raftexample/raft.go @@ -122,12 +122,8 @@ func (rc *raftNode) replayWAL() *wal.WAL { } func (rc *raftNode) writeError(err error) { - rc.errorC <- err - rc.stop() -} - -func (rc *raftNode) stop() { close(rc.commitC) + rc.errorC <- err close(rc.errorC) rc.node.Stop() } @@ -214,7 +210,9 @@ func (rc *raftNode) serveChannels() { return case <-stopc: - rc.stop() + close(rc.commitC) + close(rc.errorC) + rc.node.Stop() return } } diff --git a/contrib/raftexample/raftexample_test.go b/contrib/raftexample/raftexample_test.go index cd6188d60..7a70937bf 100644 --- a/contrib/raftexample/raftexample_test.go +++ b/contrib/raftexample/raftexample_test.go @@ -87,11 +87,15 @@ func TestProposeOnCommit(t *testing.T) { // feedback for "n" committed entries, then update donec go func(pC chan<- string, cC <-chan *string, eC <-chan error) { for n := 0; n < 100; n++ { + s, ok := <-cC + if !ok { + pC = nil + } select { - case s := <-cC: - pC <- *s + case pC <- *s: + continue case err, _ := <-eC: - t.Fatalf("eC closed (%v)", err) + t.Fatalf("eC message (%v)", err) } } donec <- struct{}{} From 63bc804253979a3c117f6826d898a17148ab8f33 Mon Sep 17 00:00:00 2001 From: chz Date: Sun, 20 Dec 2015 18:48:45 -0800 Subject: [PATCH 3/3] contrib/raftexample: shutdown rafthttp on closed proposal channel Otherwise listening ports leak across unit tests and ports won't bind. --- contrib/raftexample/listener.go | 59 +++++++++++++++++++++++ contrib/raftexample/raft.go | 63 +++++++++++++++++++------ contrib/raftexample/raftexample_test.go | 52 ++++++++++++++++---- 3 files changed, 152 insertions(+), 22 deletions(-) create mode 100644 contrib/raftexample/listener.go diff --git a/contrib/raftexample/listener.go b/contrib/raftexample/listener.go new file mode 100644 index 000000000..361dd5df3 --- /dev/null +++ b/contrib/raftexample/listener.go @@ -0,0 +1,59 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "errors" + "net" + "time" +) + +// stoppableListener sets TCP keep-alive timeouts on accepted +// connections and waits on stopc message +type stoppableListener struct { + *net.TCPListener + stopc <-chan struct{} +} + +func newStoppableListener(addr string, stopc <-chan struct{}) (*stoppableListener, error) { + ln, err := net.Listen("tcp", addr) + if err != nil { + return nil, err + } + return &stoppableListener{ln.(*net.TCPListener), stopc}, nil +} + +func (ln stoppableListener) Accept() (c net.Conn, err error) { + connc := make(chan *net.TCPConn, 1) + errc := make(chan error, 1) + go func() { + tc, err := ln.AcceptTCP() + if err != nil { + errc <- err + } else { + connc <- tc + } + }() + select { + case <-ln.stopc: + return nil, errors.New("server stopped") + case err := <-errc: + return nil, err + case tc := <-connc: + tc.SetKeepAlive(true) + tc.SetKeepAlivePeriod(3 * time.Minute) + return tc, nil + } +} diff --git a/contrib/raftexample/raft.go b/contrib/raftexample/raft.go index 150fd74aa..9475812c6 100644 --- a/contrib/raftexample/raft.go +++ b/contrib/raftexample/raft.go @@ -49,13 +49,16 @@ type raftNode struct { raftStorage *raft.MemoryStorage wal *wal.WAL transport *rafthttp.Transport + stopc chan struct{} // signals proposal channel closed + httpstopc chan struct{} // signals http server to shutdown + httpdonec chan struct{} // signals http server shutdown complete } // newRaftNode initiates a raft instance and returns a committed log entry // channel and error channel. Proposals for log updates are sent over the // provided the proposal channel. All log entries are replayed over the // commit channel, followed by a nil message (to indicate the channel is -// current), then new log entries. +// current), then new log entries. To shutdown, close proposeC and read errorC. func newRaftNode(id int, peers []string, proposeC <-chan string) (<-chan *string, <-chan error) { rc := &raftNode{ proposeC: proposeC, @@ -65,22 +68,31 @@ func newRaftNode(id int, peers []string, proposeC <-chan string) (<-chan *string peers: peers, waldir: fmt.Sprintf("raftexample-%d", id), raftStorage: raft.NewMemoryStorage(), + stopc: make(chan struct{}), + httpstopc: make(chan struct{}), + httpdonec: make(chan struct{}), // rest of structure populated after WAL replay } go rc.startRaft() return rc.commitC, rc.errorC } -// publishEntries writes committed log entries to commit channel. -func (rc *raftNode) publishEntries(ents []raftpb.Entry) { +// publishEntries writes committed log entries to commit channel and returns +// whether all entries could be published. +func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool { for i := range ents { if ents[i].Type != raftpb.EntryNormal || len(ents[i].Data) == 0 { // ignore conf changes and empty messages continue } s := string(ents[i].Data) - rc.commitC <- &s + select { + case rc.commitC <- &s: + case <-rc.stopc: + return false + } } + return true } // openWAL returns a WAL ready for reading. @@ -122,6 +134,7 @@ func (rc *raftNode) replayWAL() *wal.WAL { } func (rc *raftNode) writeError(err error) { + rc.stopHTTP() close(rc.commitC) rc.errorC <- err close(rc.errorC) @@ -174,6 +187,20 @@ func (rc *raftNode) startRaft() { go rc.serveChannels() } +// stop closes http, closes all channels, and stops raft. +func (rc *raftNode) stop() { + rc.stopHTTP() + close(rc.commitC) + close(rc.errorC) + rc.node.Stop() +} + +func (rc *raftNode) stopHTTP() { + rc.transport.Stop() + close(rc.httpstopc) + <-rc.httpdonec +} + func (rc *raftNode) serveChannels() { defer rc.wal.Close() @@ -181,14 +208,13 @@ func (rc *raftNode) serveChannels() { defer ticker.Stop() // send proposals over raft - stopc := make(chan struct{}, 1) go func() { for prop := range rc.proposeC { // blocks until accepted by raft state machine rc.node.Propose(context.TODO(), []byte(prop)) } // client closed channel; shutdown raft if not already - stopc <- struct{}{} + close(rc.stopc) }() // event loop on raft state machine updates @@ -202,17 +228,18 @@ func (rc *raftNode) serveChannels() { rc.wal.Save(rd.HardState, rd.Entries) rc.raftStorage.Append(rd.Entries) rc.transport.Send(rd.Messages) - rc.publishEntries(rd.Entries) + if ok := rc.publishEntries(rd.Entries); !ok { + rc.stop() + return + } rc.node.Advance() case err := <-rc.transport.ErrorC: rc.writeError(err) return - case <-stopc: - close(rc.commitC) - close(rc.errorC) - rc.node.Stop() + case <-rc.stopc: + rc.stop() return } } @@ -224,10 +251,18 @@ func (rc *raftNode) serveRaft() { log.Fatalf("raftexample: Failed parsing URL (%v)", err) } - srv := http.Server{Addr: url.Host, Handler: rc.transport.Handler()} - if err := srv.ListenAndServe(); err != nil { - log.Fatalf("raftexample: Failed serving rafthttp (%v)", err) + ln, err := newStoppableListener(url.Host, rc.httpstopc) + if err != nil { + log.Fatalf("raftexample: Failed to listen rafthttp (%v)", err) } + + err = (&http.Server{Handler: rc.transport.Handler()}).Serve(ln) + select { + case <-rc.httpstopc: + default: + log.Fatalf("raftexample: Failed to serve rafthttp (%v)", err) + } + close(rc.httpdonec) } func (rc *raftNode) Process(ctx context.Context, m raftpb.Message) error { diff --git a/contrib/raftexample/raftexample_test.go b/contrib/raftexample/raftexample_test.go index 7a70937bf..0aeb91d44 100644 --- a/contrib/raftexample/raftexample_test.go +++ b/contrib/raftexample/raftexample_test.go @@ -44,15 +44,20 @@ func newCluster(n int) *cluster { os.RemoveAll(fmt.Sprintf("raftexample-%d", i+1)) clus.proposeC[i] = make(chan string, 1) clus.commitC[i], clus.errorC[i] = newRaftNode(i+1, clus.peers, clus.proposeC[i]) - // replay local log + } + + return clus +} + +// sinkReplay reads all commits in each node's local log. +func (clus *cluster) sinkReplay() { + for i := range clus.peers { for s := range clus.commitC[i] { if s == nil { break } } } - - return clus } // Close closes all cluster nodes and returns an error if any failed. @@ -72,15 +77,19 @@ func (clus *cluster) Close() (err error) { return err } +func (clus *cluster) closeNoErrors(t *testing.T) { + if err := clus.Close(); err != nil { + t.Fatal(err) + } +} + // TestProposeOnCommit starts three nodes and feeds commits back into the proposal // channel. The intent is to ensure blocking on a proposal won't block raft progress. func TestProposeOnCommit(t *testing.T) { clus := newCluster(3) - defer func() { - if err := clus.Close(); err != nil { - t.Fatal(err) - } - }() + defer clus.closeNoErrors(t) + + clus.sinkReplay() donec := make(chan struct{}) for i := range clus.peers { @@ -109,3 +118,30 @@ func TestProposeOnCommit(t *testing.T) { <-donec } } + +// TestCloseBeforeReplay tests closing the producer before raft starts. +func TestCloseProposerBeforeReplay(t *testing.T) { + clus := newCluster(1) + // close before replay so raft never starts + defer clus.closeNoErrors(t) +} + +// TestCloseProposerInflight tests closing the producer while +// committed messages are being published to the client. +func TestCloseProposerInflight(t *testing.T) { + clus := newCluster(1) + defer clus.closeNoErrors(t) + + clus.sinkReplay() + + // some inflight ops + go func() { + clus.proposeC[0] <- "foo" + clus.proposeC[0] <- "bar" + }() + + // wait for one message + if c, ok := <-clus.commitC[0]; *c != "foo" || !ok { + t.Fatalf("Commit failed") + } +}