From 9b986fb4c123bc8a03b82aace606d3617ca7892a Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Sun, 1 Mar 2015 07:02:24 -0800 Subject: [PATCH] rafthttp: report unreachable status of the peer When it failed to send message to the remote peer, it reports unreachable to raft. --- etcdserver/server.go | 2 ++ rafthttp/http_test.go | 2 ++ rafthttp/peer.go | 6 +++--- rafthttp/pipeline.go | 5 ++++- rafthttp/pipeline_test.go | 12 ++++++------ rafthttp/stream.go | 5 ++++- rafthttp/transport.go | 1 + 7 files changed, 22 insertions(+), 11 deletions(-) diff --git a/etcdserver/server.go b/etcdserver/server.go index 3cefd27ff..030911092 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -326,6 +326,8 @@ func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error { return s.r.Step(ctx, m) } +func (s *EtcdServer) ReportUnreachable(id uint64) { s.r.ReportUnreachable(id) } + func (s *EtcdServer) run() { var syncC <-chan time.Time var shouldstop bool diff --git a/rafthttp/http_test.go b/rafthttp/http_test.go index d2b47da8f..4c8b4e0e3 100644 --- a/rafthttp/http_test.go +++ b/rafthttp/http_test.go @@ -162,12 +162,14 @@ func (er *errReader) Read(_ []byte) (int, error) { return 0, errors.New("some er type nopProcessor struct{} func (p *nopProcessor) Process(ctx context.Context, m raftpb.Message) error { return nil } +func (p *nopProcessor) ReportUnreachable(id uint64) {} type errProcessor struct { err error } func (p *errProcessor) Process(ctx context.Context, m raftpb.Message) error { return p.err } +func (p *errProcessor) ReportUnreachable(id uint64) {} type resWriterToError struct { code int diff --git a/rafthttp/peer.go b/rafthttp/peer.go index 2a51b2c35..80c7232f0 100644 --- a/rafthttp/peer.go +++ b/rafthttp/peer.go @@ -65,9 +65,9 @@ type peer struct { func startPeer(tr http.RoundTripper, u string, local, to, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error) *peer { p := &peer{ id: to, - msgAppWriter: startStreamWriter(fs), - writer: startStreamWriter(fs), - pipeline: newPipeline(tr, u, to, cid, fs, errorc), + msgAppWriter: startStreamWriter(fs, r), + writer: startStreamWriter(fs, r), + pipeline: newPipeline(tr, u, to, cid, fs, r, errorc), sendc: make(chan raftpb.Message), recvc: make(chan raftpb.Message, recvBufSize), newURLc: make(chan string), diff --git a/rafthttp/pipeline.go b/rafthttp/pipeline.go index 7001bfc4e..52837ea3a 100644 --- a/rafthttp/pipeline.go +++ b/rafthttp/pipeline.go @@ -45,6 +45,7 @@ type pipeline struct { // the url this pipeline sends to u string fs *stats.FollowerStats + r Raft errorc chan error msgc chan raftpb.Message @@ -57,13 +58,14 @@ type pipeline struct { errored error } -func newPipeline(tr http.RoundTripper, u string, id, cid types.ID, fs *stats.FollowerStats, errorc chan error) *pipeline { +func newPipeline(tr http.RoundTripper, u string, id, cid types.ID, fs *stats.FollowerStats, r Raft, errorc chan error) *pipeline { p := &pipeline{ id: id, cid: cid, tr: tr, u: u, fs: fs, + r: r, errorc: errorc, msgc: make(chan raftpb.Message, pipelineBufSize), active: true, @@ -102,6 +104,7 @@ func (p *pipeline) handle() { if m.Type == raftpb.MsgApp { p.fs.Fail() } + p.r.ReportUnreachable(m.To) } else { if !p.active { log.Printf("pipeline: the connection with %s became active", p.id) diff --git a/rafthttp/pipeline_test.go b/rafthttp/pipeline_test.go index fceac688f..d24656258 100644 --- a/rafthttp/pipeline_test.go +++ b/rafthttp/pipeline_test.go @@ -32,7 +32,7 @@ import ( func TestPipelineSend(t *testing.T) { tr := &roundTripperRecorder{} fs := &stats.FollowerStats{} - p := newPipeline(tr, "http://10.0.0.1", types.ID(1), types.ID(1), fs, nil) + p := newPipeline(tr, "http://10.0.0.1", types.ID(1), types.ID(1), fs, &nopProcessor{}, nil) p.msgc <- raftpb.Message{Type: raftpb.MsgApp} p.stop() @@ -50,7 +50,7 @@ func TestPipelineSend(t *testing.T) { func TestPipelineExceedMaximalServing(t *testing.T) { tr := newRoundTripperBlocker() fs := &stats.FollowerStats{} - p := newPipeline(tr, "http://10.0.0.1", types.ID(1), types.ID(1), fs, nil) + p := newPipeline(tr, "http://10.0.0.1", types.ID(1), types.ID(1), fs, &nopProcessor{}, nil) // keep the sender busy and make the buffer full // nothing can go out as we block the sender @@ -89,7 +89,7 @@ func TestPipelineExceedMaximalServing(t *testing.T) { // it increases fail count in stats. func TestPipelineSendFailed(t *testing.T) { fs := &stats.FollowerStats{} - p := newPipeline(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), types.ID(1), fs, nil) + p := newPipeline(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), types.ID(1), fs, &nopProcessor{}, nil) p.msgc <- raftpb.Message{Type: raftpb.MsgApp} p.stop() @@ -103,7 +103,7 @@ func TestPipelineSendFailed(t *testing.T) { func TestPipelinePost(t *testing.T) { tr := &roundTripperRecorder{} - p := newPipeline(tr, "http://10.0.0.1", types.ID(1), types.ID(1), nil, nil) + p := newPipeline(tr, "http://10.0.0.1", types.ID(1), types.ID(1), nil, &nopProcessor{}, nil) if err := p.post([]byte("some data")); err != nil { t.Fatalf("unexpect post error: %v", err) } @@ -145,7 +145,7 @@ func TestPipelinePostBad(t *testing.T) { {"http://10.0.0.1", http.StatusCreated, nil}, } for i, tt := range tests { - p := newPipeline(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), nil, make(chan error)) + p := newPipeline(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), nil, &nopProcessor{}, make(chan error)) err := p.post([]byte("some data")) p.stop() @@ -166,7 +166,7 @@ func TestPipelinePostErrorc(t *testing.T) { } for i, tt := range tests { errorc := make(chan error, 1) - p := newPipeline(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), nil, errorc) + p := newPipeline(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), nil, &nopProcessor{}, errorc) p.post([]byte("some data")) p.stop() select { diff --git a/rafthttp/stream.go b/rafthttp/stream.go index 19130d45d..66fb38d36 100644 --- a/rafthttp/stream.go +++ b/rafthttp/stream.go @@ -63,6 +63,7 @@ type outgoingConn struct { // attached outgoingConn. type streamWriter struct { fs *stats.FollowerStats + r Raft mu sync.Mutex // guard field working and closer closer io.Closer @@ -74,9 +75,10 @@ type streamWriter struct { done chan struct{} } -func startStreamWriter(fs *stats.FollowerStats) *streamWriter { +func startStreamWriter(fs *stats.FollowerStats, r Raft) *streamWriter { w := &streamWriter{ fs: fs, + r: r, msgc: make(chan raftpb.Message, streamBufSize), connc: make(chan *outgoingConn), stopc: make(chan struct{}), @@ -118,6 +120,7 @@ func (cw *streamWriter) run() { log.Printf("rafthttp: failed to send message on stream %s due to %v. waiting for a new stream to be established.", t, err) cw.resetCloser() heartbeatc, msgc = nil, nil + cw.r.ReportUnreachable(m.To) continue } flusher.Flush() diff --git a/rafthttp/transport.go b/rafthttp/transport.go index ff447db4b..1a5b473f0 100644 --- a/rafthttp/transport.go +++ b/rafthttp/transport.go @@ -29,6 +29,7 @@ import ( type Raft interface { Process(ctx context.Context, m raftpb.Message) error + ReportUnreachable(id uint64) } type Transporter interface {