diff --git a/etcdserver/server.go b/etcdserver/server.go index 48862b0ef..6e2902c56 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -326,6 +326,8 @@ func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error { return s.r.Step(ctx, m) } +func (s *EtcdServer) ReportUnreachable(id uint64) { s.r.ReportUnreachable(id) } + func (s *EtcdServer) run() { var syncC <-chan time.Time var shouldstop bool diff --git a/raft/raft.go b/raft/raft.go index 585f267d3..2c1c96663 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -117,11 +117,12 @@ func (pr *Progress) waitDecr(i int) { pr.Wait = 0 } } -func (pr *Progress) waitSet(w int) { pr.Wait = w } -func (pr *Progress) waitReset() { pr.Wait = 0 } -func (pr *Progress) reachable() { pr.Unreachable = false } -func (pr *Progress) unreachable() { pr.Unreachable = true } -func (pr *Progress) shouldWait() bool { return (pr.Unreachable || pr.Match == 0) && pr.Wait > 0 } +func (pr *Progress) waitSet(w int) { pr.Wait = w } +func (pr *Progress) waitReset() { pr.Wait = 0 } +func (pr *Progress) isUnreachable() bool { return pr.Unreachable } +func (pr *Progress) reachable() { pr.Unreachable = false } +func (pr *Progress) unreachable() { pr.Unreachable = true } +func (pr *Progress) shouldWait() bool { return (pr.Unreachable || pr.Match == 0) && pr.Wait > 0 } func (pr *Progress) hasPendingSnapshot() bool { return pr.PendingSnapshot != 0 } func (pr *Progress) setPendingSnapshot(i uint64) { pr.PendingSnapshot = i } @@ -269,7 +270,7 @@ func (r *raft) sendAppend(to uint64) { m := pb.Message{} m.To = to if r.needSnapshot(pr.Next) { - if pr.Unreachable { + if pr.isUnreachable() { // do not try to send snapshot until the Progress is // reachable return @@ -297,9 +298,9 @@ func (r *raft) sendAppend(to uint64) { m.Commit = r.raftLog.committed // optimistically increase the next if the follower // has been matched. - if n := len(m.Entries); pr.Match != 0 && !pr.Unreachable && n != 0 { + if n := len(m.Entries); pr.Match != 0 && !pr.isUnreachable() && n != 0 { pr.optimisticUpdate(m.Entries[n-1].Index) - } else if pr.Match == 0 || pr.Unreachable { + } else if pr.Match == 0 || pr.isUnreachable() { pr.waitSet(r.heartbeatTimeout) } } @@ -535,7 +536,10 @@ func stepLeader(r *raft, m pb.Message) { r.appendEntry(m.Entries...) r.bcastAppend() case pb.MsgAppResp: - pr.reachable() + if pr.isUnreachable() { + pr.reachable() + log.Printf("raft: %x received msgAppResp from %x and changed it to be reachable [%s]", r.id, m.From, pr) + } if m.Reject { log.Printf("raft: %x received msgApp rejection(lastindex: %d) from %x for index %d", r.id, m.RejectHint, m.From, m.Index) @@ -558,7 +562,10 @@ func stepLeader(r *raft, m pb.Message) { } } case pb.MsgHeartbeatResp: - pr.reachable() + if pr.isUnreachable() { + pr.reachable() + log.Printf("raft: %x received msgHeartbeatResp from %x and changed it to be reachable [%s]", r.id, m.From, pr) + } if pr.Match < r.raftLog.lastIndex() { r.sendAppend(m.From) } @@ -581,7 +588,10 @@ func stepLeader(r *raft, m pb.Message) { pr.waitSet(r.electionTimeout) } case pb.MsgUnreachable: - r.prs[m.From].unreachable() + if !pr.isUnreachable() { + pr.unreachable() + log.Printf("raft: %x failed to send message to %x and changed it to be unreachable [%s]", r.id, m.From, pr) + } } } diff --git a/rafthttp/http_test.go b/rafthttp/http_test.go index d2b47da8f..4c8b4e0e3 100644 --- a/rafthttp/http_test.go +++ b/rafthttp/http_test.go @@ -162,12 +162,14 @@ func (er *errReader) Read(_ []byte) (int, error) { return 0, errors.New("some er type nopProcessor struct{} func (p *nopProcessor) Process(ctx context.Context, m raftpb.Message) error { return nil } +func (p *nopProcessor) ReportUnreachable(id uint64) {} type errProcessor struct { err error } func (p *errProcessor) Process(ctx context.Context, m raftpb.Message) error { return p.err } +func (p *errProcessor) ReportUnreachable(id uint64) {} type resWriterToError struct { code int diff --git a/rafthttp/peer.go b/rafthttp/peer.go index 2a51b2c35..80c7232f0 100644 --- a/rafthttp/peer.go +++ b/rafthttp/peer.go @@ -65,9 +65,9 @@ type peer struct { func startPeer(tr http.RoundTripper, u string, local, to, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error) *peer { p := &peer{ id: to, - msgAppWriter: startStreamWriter(fs), - writer: startStreamWriter(fs), - pipeline: newPipeline(tr, u, to, cid, fs, errorc), + msgAppWriter: startStreamWriter(fs, r), + writer: startStreamWriter(fs, r), + pipeline: newPipeline(tr, u, to, cid, fs, r, errorc), sendc: make(chan raftpb.Message), recvc: make(chan raftpb.Message, recvBufSize), newURLc: make(chan string), diff --git a/rafthttp/pipeline.go b/rafthttp/pipeline.go index 7001bfc4e..52837ea3a 100644 --- a/rafthttp/pipeline.go +++ b/rafthttp/pipeline.go @@ -45,6 +45,7 @@ type pipeline struct { // the url this pipeline sends to u string fs *stats.FollowerStats + r Raft errorc chan error msgc chan raftpb.Message @@ -57,13 +58,14 @@ type pipeline struct { errored error } -func newPipeline(tr http.RoundTripper, u string, id, cid types.ID, fs *stats.FollowerStats, errorc chan error) *pipeline { +func newPipeline(tr http.RoundTripper, u string, id, cid types.ID, fs *stats.FollowerStats, r Raft, errorc chan error) *pipeline { p := &pipeline{ id: id, cid: cid, tr: tr, u: u, fs: fs, + r: r, errorc: errorc, msgc: make(chan raftpb.Message, pipelineBufSize), active: true, @@ -102,6 +104,7 @@ func (p *pipeline) handle() { if m.Type == raftpb.MsgApp { p.fs.Fail() } + p.r.ReportUnreachable(m.To) } else { if !p.active { log.Printf("pipeline: the connection with %s became active", p.id) diff --git a/rafthttp/pipeline_test.go b/rafthttp/pipeline_test.go index fceac688f..d24656258 100644 --- a/rafthttp/pipeline_test.go +++ b/rafthttp/pipeline_test.go @@ -32,7 +32,7 @@ import ( func TestPipelineSend(t *testing.T) { tr := &roundTripperRecorder{} fs := &stats.FollowerStats{} - p := newPipeline(tr, "http://10.0.0.1", types.ID(1), types.ID(1), fs, nil) + p := newPipeline(tr, "http://10.0.0.1", types.ID(1), types.ID(1), fs, &nopProcessor{}, nil) p.msgc <- raftpb.Message{Type: raftpb.MsgApp} p.stop() @@ -50,7 +50,7 @@ func TestPipelineSend(t *testing.T) { func TestPipelineExceedMaximalServing(t *testing.T) { tr := newRoundTripperBlocker() fs := &stats.FollowerStats{} - p := newPipeline(tr, "http://10.0.0.1", types.ID(1), types.ID(1), fs, nil) + p := newPipeline(tr, "http://10.0.0.1", types.ID(1), types.ID(1), fs, &nopProcessor{}, nil) // keep the sender busy and make the buffer full // nothing can go out as we block the sender @@ -89,7 +89,7 @@ func TestPipelineExceedMaximalServing(t *testing.T) { // it increases fail count in stats. func TestPipelineSendFailed(t *testing.T) { fs := &stats.FollowerStats{} - p := newPipeline(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), types.ID(1), fs, nil) + p := newPipeline(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), types.ID(1), fs, &nopProcessor{}, nil) p.msgc <- raftpb.Message{Type: raftpb.MsgApp} p.stop() @@ -103,7 +103,7 @@ func TestPipelineSendFailed(t *testing.T) { func TestPipelinePost(t *testing.T) { tr := &roundTripperRecorder{} - p := newPipeline(tr, "http://10.0.0.1", types.ID(1), types.ID(1), nil, nil) + p := newPipeline(tr, "http://10.0.0.1", types.ID(1), types.ID(1), nil, &nopProcessor{}, nil) if err := p.post([]byte("some data")); err != nil { t.Fatalf("unexpect post error: %v", err) } @@ -145,7 +145,7 @@ func TestPipelinePostBad(t *testing.T) { {"http://10.0.0.1", http.StatusCreated, nil}, } for i, tt := range tests { - p := newPipeline(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), nil, make(chan error)) + p := newPipeline(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), nil, &nopProcessor{}, make(chan error)) err := p.post([]byte("some data")) p.stop() @@ -166,7 +166,7 @@ func TestPipelinePostErrorc(t *testing.T) { } for i, tt := range tests { errorc := make(chan error, 1) - p := newPipeline(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), nil, errorc) + p := newPipeline(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), nil, &nopProcessor{}, errorc) p.post([]byte("some data")) p.stop() select { diff --git a/rafthttp/stream.go b/rafthttp/stream.go index 19130d45d..66fb38d36 100644 --- a/rafthttp/stream.go +++ b/rafthttp/stream.go @@ -63,6 +63,7 @@ type outgoingConn struct { // attached outgoingConn. type streamWriter struct { fs *stats.FollowerStats + r Raft mu sync.Mutex // guard field working and closer closer io.Closer @@ -74,9 +75,10 @@ type streamWriter struct { done chan struct{} } -func startStreamWriter(fs *stats.FollowerStats) *streamWriter { +func startStreamWriter(fs *stats.FollowerStats, r Raft) *streamWriter { w := &streamWriter{ fs: fs, + r: r, msgc: make(chan raftpb.Message, streamBufSize), connc: make(chan *outgoingConn), stopc: make(chan struct{}), @@ -118,6 +120,7 @@ func (cw *streamWriter) run() { log.Printf("rafthttp: failed to send message on stream %s due to %v. waiting for a new stream to be established.", t, err) cw.resetCloser() heartbeatc, msgc = nil, nil + cw.r.ReportUnreachable(m.To) continue } flusher.Flush() diff --git a/rafthttp/transport.go b/rafthttp/transport.go index ff447db4b..1a5b473f0 100644 --- a/rafthttp/transport.go +++ b/rafthttp/transport.go @@ -29,6 +29,7 @@ import ( type Raft interface { Process(ctx context.Context, m raftpb.Message) error + ReportUnreachable(id uint64) } type Transporter interface {