From 20b7df3c127a82ab4fa4d968c3f7799588540e2c Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 26 Oct 2015 15:02:50 -0700 Subject: [PATCH] rafthttp: fix data races detected by go race detector Conflicts: rafthttp/pipeline.go --- rafthttp/pipeline.go | 48 ++++++++++++++++++--------------------- rafthttp/pipeline_test.go | 2 ++ rafthttp/stream_test.go | 32 ++++++++++++++++++++++---- 3 files changed, 51 insertions(+), 31 deletions(-) diff --git a/rafthttp/pipeline.go b/rafthttp/pipeline.go index 8e6ab0e81..ef66c3f88 100644 --- a/rafthttp/pipeline.go +++ b/rafthttp/pipeline.go @@ -83,32 +83,34 @@ func newPipeline(tr http.RoundTripper, picker *urlPicker, from, to, cid types.ID } func (p *pipeline) stop() { - close(p.msgc) close(p.stopc) p.wg.Wait() } func (p *pipeline) handle() { defer p.wg.Done() - for m := range p.msgc { - start := time.Now() - err := p.post(pbutil.MustMarshal(&m)) - if err == errStopped { - return - } - end := time.Now() - if err != nil { - reportSentFailure(pipelineMsg, m) - p.status.deactivate(failureType{source: pipelineMsg, action: "write"}, err.Error()) - if m.Type == raftpb.MsgApp && p.fs != nil { - p.fs.Fail() + for { + select { + case m := <-p.msgc: + start := time.Now() + err := p.post(pbutil.MustMarshal(&m)) + end := time.Now() + + if err != nil { + p.status.deactivate(failureType{source: pipelineMsg, action: "write"}, err.Error()) + + reportSentFailure(pipelineMsg, m) + if m.Type == raftpb.MsgApp && p.fs != nil { + p.fs.Fail() + } + p.r.ReportUnreachable(m.To) + if isMsgSnap(m) { + p.r.ReportSnapshot(m.To, raft.SnapshotFailure) + } + continue } - p.r.ReportUnreachable(m.To) - if isMsgSnap(m) { - p.r.ReportSnapshot(m.To, raft.SnapshotFailure) - } - } else { + p.status.activate() if m.Type == raftpb.MsgApp && p.fs != nil { p.fs.Succ(end.Sub(start)) @@ -117,6 +119,8 @@ func (p *pipeline) handle() { p.r.ReportSnapshot(m.To, raft.SnapshotFinish) } reportSentDuration(pipelineMsg, m, time.Since(start)) + case <-p.stopc: + return } } } @@ -138,13 +142,6 @@ func (p *pipeline) post(data []byte) (err error) { req.Header.Set("X-Min-Cluster-Version", version.MinClusterVersion) req.Header.Set("X-Etcd-Cluster-ID", p.cid.String()) - var stopped bool - defer func() { - if stopped { - // rewrite to errStopped so the caller goroutine can stop itself - err = errStopped - } - }() done := make(chan struct{}, 1) cancel := httputil.RequestCanceler(p.tr, req) go func() { @@ -152,7 +149,6 @@ func (p *pipeline) post(data []byte) (err error) { case <-done: case <-p.stopc: waitSchedule() - stopped = true cancel() } }() diff --git a/rafthttp/pipeline_test.go b/rafthttp/pipeline_test.go index 86e5bf9f7..63e0d5762 100644 --- a/rafthttp/pipeline_test.go +++ b/rafthttp/pipeline_test.go @@ -223,9 +223,11 @@ func newRoundTripperBlocker() *roundTripperBlocker { cancel: make(map[*http.Request]chan struct{}), } } + func (t *roundTripperBlocker) unblock() { close(t.unblockc) } + func (t *roundTripperBlocker) CancelRequest(req *http.Request) { t.mu.Lock() defer t.mu.Unlock() diff --git a/rafthttp/stream_test.go b/rafthttp/stream_test.go index 176a9c9c4..f4039444b 100644 --- a/rafthttp/stream_test.go +++ b/rafthttp/stream_test.go @@ -6,6 +6,7 @@ import ( "net/http" "net/http/httptest" "reflect" + "sync" "testing" "time" @@ -35,8 +36,8 @@ func TestStreamWriterAttachOutgoingConn(t *testing.T) { sw.attach(&outgoingConn{t: streamTypeMessage, Writer: wfc, Flusher: wfc, Closer: wfc}) testutil.WaitSchedule() // previous attached connection should be closed - if prevwfc != nil && prevwfc.closed != true { - t.Errorf("#%d: close of previous connection = %v, want true", i, prevwfc.closed) + if prevwfc != nil && prevwfc.Closed() != true { + t.Errorf("#%d: close of previous connection = %v, want true", i, prevwfc.Closed()) } // starts working if _, ok := sw.writec(); ok != true { @@ -49,7 +50,7 @@ func TestStreamWriterAttachOutgoingConn(t *testing.T) { if _, ok := sw.writec(); ok != true { t.Errorf("#%d: working status = %v, want true", i, ok) } - if wfc.written == 0 { + if wfc.Written() == 0 { t.Errorf("#%d: failed to write to the underlying connection", i) } } @@ -59,7 +60,7 @@ func TestStreamWriterAttachOutgoingConn(t *testing.T) { if _, ok := sw.writec(); ok != false { t.Errorf("working status after stop = %v, want false", ok) } - if wfc.closed != true { + if wfc.Closed() != true { t.Errorf("failed to close the underlying connection") } } @@ -78,7 +79,7 @@ func TestStreamWriterAttachBadOutgoingConn(t *testing.T) { if _, ok := sw.writec(); ok != false { t.Errorf("working = %v, want false", ok) } - if wfc.closed != true { + if wfc.Closed() != true { t.Errorf("failed to close the underlying connection") } } @@ -338,21 +339,42 @@ func TestCheckStreamSupport(t *testing.T) { } type fakeWriteFlushCloser struct { + mu sync.Mutex err error written int closed bool } func (wfc *fakeWriteFlushCloser) Write(p []byte) (n int, err error) { + wfc.mu.Lock() + defer wfc.mu.Unlock() + wfc.written += len(p) return len(p), wfc.err } + func (wfc *fakeWriteFlushCloser) Flush() {} + func (wfc *fakeWriteFlushCloser) Close() error { + wfc.mu.Lock() + defer wfc.mu.Unlock() + wfc.closed = true return wfc.err } +func (wfc *fakeWriteFlushCloser) Written() int { + wfc.mu.Lock() + defer wfc.mu.Unlock() + return wfc.written +} + +func (wfc *fakeWriteFlushCloser) Closed() bool { + wfc.mu.Lock() + defer wfc.mu.Unlock() + return wfc.closed +} + type fakeStreamHandler struct { t streamType sw *streamWriter