diff --git a/rafthttp/pipeline.go b/rafthttp/pipeline.go index 0bd717b8a..075777cca 100644 --- a/rafthttp/pipeline.go +++ b/rafthttp/pipeline.go @@ -153,14 +153,18 @@ func (p *pipeline) post(data []byte) (err error) { resp.Body.Close() err = checkPostResponse(resp, b, req, p.to) - // errMemberRemoved is a critical error since a removed member should - // always be stopped. So we use reportCriticalError to report it to errorc. - if err == errMemberRemoved { - reportCriticalError(err, p.errorc) - return nil + if err != nil { + p.picker.unreachable(u) + // errMemberRemoved is a critical error since a removed member should + // always be stopped. So we use reportCriticalError to report it to errorc. + if err == errMemberRemoved { + reportCriticalError(err, p.errorc) + return nil + } + return err } - return err + return nil } // waitSchedule waits other goroutines to be scheduled for a while diff --git a/rafthttp/stream.go b/rafthttp/stream.go index 566806cc7..bc3674bd7 100644 --- a/rafthttp/stream.go +++ b/rafthttp/stream.go @@ -392,12 +392,14 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) { lv := semver.Must(semver.NewVersion(version.Version)) if compareMajorMinorVersion(rv, lv) == -1 && !checkStreamSupport(rv, t) { resp.Body.Close() + cr.picker.unreachable(u) return nil, errUnsupportedStreamType } switch resp.StatusCode { case http.StatusGone: resp.Body.Close() + cr.picker.unreachable(u) err := fmt.Errorf("the member has been permanently removed from the cluster") select { case cr.errorc <- err: @@ -408,6 +410,7 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) { return resp.Body, nil case http.StatusNotFound: resp.Body.Close() + cr.picker.unreachable(u) return nil, fmt.Errorf("remote member %s could not recognize local member", cr.remote) case http.StatusPreconditionFailed: b, err := ioutil.ReadAll(resp.Body) @@ -416,6 +419,7 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) { return nil, err } resp.Body.Close() + cr.picker.unreachable(u) switch strings.TrimSuffix(string(b), "\n") { case errIncompatibleVersion.Error(): @@ -430,6 +434,7 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) { } default: resp.Body.Close() + cr.picker.unreachable(u) return nil, fmt.Errorf("unhandled http status %d", resp.StatusCode) } }