diff --git a/rafthttp/http.go b/rafthttp/http.go index a245cc53a..fc8b77d32 100644 --- a/rafthttp/http.go +++ b/rafthttp/http.go @@ -24,6 +24,7 @@ import ( pioutil "github.com/coreos/etcd/pkg/ioutil" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/version" ) const ( @@ -125,6 +126,8 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } + w.Header().Add("X-Server-Version", version.Version) + var t streamType switch path.Dir(r.URL.Path) { // backward compatibility diff --git a/rafthttp/http_test.go b/rafthttp/http_test.go index 376586758..53670c933 100644 --- a/rafthttp/http_test.go +++ b/rafthttp/http_test.go @@ -28,6 +28,7 @@ import ( "github.com/coreos/etcd/pkg/pbutil" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/version" ) func TestServeRaftPrefix(t *testing.T) { @@ -197,11 +198,14 @@ func TestServeRaftStreamPrefix(t *testing.T) { case <-time.After(time.Second): t.Fatalf("#%d: failed to attach outgoingConn", i) } + if g := rw.Header().Get("X-Server-Version"); g != version.Version { + t.Errorf("#%d: X-Server-Version = %s, want %s", i, g, version.Version) + } if conn.t != tt.wtype { - t.Errorf("$%d: type = %s, want %s", i, conn.t, tt.wtype) + t.Errorf("#%d: type = %s, want %s", i, conn.t, tt.wtype) } if conn.termStr != wterm { - t.Errorf("$%d: term = %s, want %s", i, conn.termStr, wterm) + t.Errorf("#%d: term = %s, want %s", i, conn.termStr, wterm) } conn.Close() } diff --git a/rafthttp/pipeline_test.go b/rafthttp/pipeline_test.go index 8ada077d0..ce521fbd0 100644 --- a/rafthttp/pipeline_test.go +++ b/rafthttp/pipeline_test.go @@ -197,15 +197,16 @@ func (t *roundTripperBlocker) unblock() { } type respRoundTripper struct { - code int - err error + code int + header http.Header + err error } func newRespRoundTripper(code int, err error) *respRoundTripper { return &respRoundTripper{code: code, err: err} } func (t *respRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { - return &http.Response{StatusCode: t.code, Body: &nopReadCloser{}}, t.err + return &http.Response{StatusCode: t.code, Header: t.header, Body: &nopReadCloser{}}, t.err } type roundTripperRecorder struct { diff --git a/rafthttp/stream.go b/rafthttp/stream.go index 60201bfd0..c5755d1a1 100644 --- a/rafthttp/stream.go +++ b/rafthttp/stream.go @@ -25,9 +25,11 @@ import ( "sync" "time" + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver" "github.com/coreos/etcd/etcdserver/stats" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/version" ) const ( @@ -38,6 +40,16 @@ const ( streamBufSize = 4096 ) +var ( + errUnsupportedStreamType = fmt.Errorf("unsupported stream type") + + // the key is in string format "major.minor.patch" + supportedStream = map[string][]streamType{ + "2.0.0": []streamType{streamTypeMsgApp}, + "2.1.0": []streamType{streamTypeMsgApp, streamTypeMsgAppV2, streamTypeMessage}, + } +) + type streamType string func (t streamType) endpoint() string { @@ -256,13 +268,22 @@ func startStreamReader(tr http.RoundTripper, picker *urlPicker, t streamType, fr func (cr *streamReader) run() { for { - rc, err := cr.dial() + t := cr.t + rc, err := cr.dial(t) + // downgrade to streamTypeMsgApp if the remote doesn't support + // streamTypeMsgAppV2 + if t == streamTypeMsgAppV2 && err == errUnsupportedStreamType { + t = streamTypeMsgApp + rc, err = cr.dial(t) + } if err != nil { - log.Printf("rafthttp: roundtripping error: %v", err) + if err != errUnsupportedStreamType { + log.Printf("rafthttp: roundtripping error: %v", err) + } } else { - err := cr.decodeLoop(rc) + err := cr.decodeLoop(rc, t) if err != io.EOF && !isClosedConnectionError(err) { - log.Printf("rafthttp: failed to read message on stream %s due to %v", cr.t, err) + log.Printf("rafthttp: failed to read message on stream %s due to %v", t, err) } } select { @@ -276,10 +297,10 @@ func (cr *streamReader) run() { } } -func (cr *streamReader) decodeLoop(rc io.ReadCloser) error { +func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error { var dec decoder cr.mu.Lock() - switch cr.t { + switch t { case streamTypeMsgApp: dec = &msgAppDecoder{r: rc, local: cr.from, remote: cr.to, term: cr.msgAppTerm} case streamTypeMsgAppV2: @@ -287,7 +308,7 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser) error { case streamTypeMessage: dec = &messageDecoder{r: rc} default: - log.Panicf("rafthttp: unhandled stream type %s", cr.t) + log.Panicf("rafthttp: unhandled stream type %s", t) } cr.closer = rc cr.mu.Unlock() @@ -347,14 +368,14 @@ func (cr *streamReader) isWorking() bool { return cr.closer != nil } -func (cr *streamReader) dial() (io.ReadCloser, error) { +func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) { u := cr.picker.pick() cr.mu.Lock() term := cr.msgAppTerm cr.mu.Unlock() uu := u - uu.Path = path.Join(cr.t.endpoint(), cr.from.String()) + uu.Path = path.Join(t.endpoint(), cr.from.String()) req, err := http.NewRequest("GET", uu.String(), nil) if err != nil { cr.picker.unreachable(u) @@ -362,7 +383,7 @@ func (cr *streamReader) dial() (io.ReadCloser, error) { } req.Header.Set("X-Etcd-Cluster-ID", cr.cid.String()) req.Header.Set("X-Raft-To", cr.to.String()) - if cr.t == streamTypeMsgApp { + if t == streamTypeMsgApp { req.Header.Set("X-Raft-Term", strconv.FormatUint(term, 10)) } cr.mu.Lock() @@ -373,6 +394,14 @@ func (cr *streamReader) dial() (io.ReadCloser, error) { cr.picker.unreachable(u) return nil, fmt.Errorf("error roundtripping to %s: %v", req.URL, err) } + + rv := serverVersion(resp.Header) + lv := semver.Must(semver.NewVersion(version.Version)) + if compareMajorMinorVersion(rv, lv) == -1 && !checkStreamSupport(rv, t) { + resp.Body.Close() + return nil, errUnsupportedStreamType + } + switch resp.StatusCode { case http.StatusGone: resp.Body.Close() @@ -384,6 +413,9 @@ func (cr *streamReader) dial() (io.ReadCloser, error) { return nil, err case http.StatusOK: return resp.Body, nil + case http.StatusNotFound: + resp.Body.Close() + return nil, fmt.Errorf("local member has not been added to the peer list of member %s", cr.to) default: resp.Body.Close() return nil, fmt.Errorf("unhandled http status %d", resp.StatusCode) @@ -411,3 +443,41 @@ func isClosedConnectionError(err error) bool { operr, ok := err.(*net.OpError) return ok && operr.Err.Error() == "use of closed network connection" } + +// serverVersion returns the version from the given header. +func serverVersion(h http.Header) *semver.Version { + verStr := h.Get("X-Server-Version") + // backward compatibility with etcd 2.0 + if verStr == "" { + verStr = "2.0.0" + } + return semver.Must(semver.NewVersion(verStr)) +} + +// compareMajorMinorVersion returns an integer comparing two versions based on +// their major and minor version. The result will be 0 if a==b, -1 if a < b, +// and 1 if a > b. +func compareMajorMinorVersion(a, b *semver.Version) int { + na := &semver.Version{Major: a.Major, Minor: a.Minor} + nb := &semver.Version{Major: b.Major, Minor: b.Minor} + switch { + case na.LessThan(*nb): + return -1 + case nb.LessThan(*na): + return 1 + default: + return 0 + } +} + +// checkStreamSupport checks whether the stream type is supported in the +// given version. +func checkStreamSupport(v *semver.Version, t streamType) bool { + nv := &semver.Version{Major: v.Major, Minor: v.Minor} + for _, s := range supportedStream[nv.String()] { + if s == t { + return true + } + } + return false +} diff --git a/rafthttp/stream_test.go b/rafthttp/stream_test.go index 1bcfa23a1..c50311332 100644 --- a/rafthttp/stream_test.go +++ b/rafthttp/stream_test.go @@ -9,10 +9,12 @@ import ( "testing" "time" + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver" "github.com/coreos/etcd/etcdserver/stats" "github.com/coreos/etcd/pkg/testutil" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/version" ) // TestStreamWriterAttachOutgoingConn tests that outgoingConn can be attached @@ -87,13 +89,12 @@ func TestStreamReaderDialRequest(t *testing.T) { sr := &streamReader{ tr: tr, picker: mustNewURLPicker(t, []string{"http://localhost:2380"}), - t: tt, from: types.ID(1), to: types.ID(2), cid: types.ID(1), msgAppTerm: 1, } - sr.dial() + sr.dial(tt) req := tr.Request() wurl := fmt.Sprintf("http://localhost:2380" + tt.endpoint() + "/1") @@ -132,18 +133,23 @@ func TestStreamReaderDialResult(t *testing.T) { {http.StatusGone, nil, false, true}, } for i, tt := range tests { - tr := newRespRoundTripper(tt.code, tt.err) + h := http.Header{} + h.Add("X-Server-Version", version.Version) + tr := &respRoundTripper{ + code: tt.code, + header: h, + err: tt.err, + } sr := &streamReader{ tr: tr, picker: mustNewURLPicker(t, []string{"http://localhost:2380"}), - t: streamTypeMessage, from: types.ID(1), to: types.ID(2), cid: types.ID(1), errorc: make(chan error, 1), } - _, err := sr.dial() + _, err := sr.dial(streamTypeMessage) if ok := err == nil; ok != tt.wok { t.Errorf("#%d: ok = %v, want %v", i, ok, tt.wok) } @@ -188,6 +194,30 @@ func TestStreamReaderUpdateMsgAppTerm(t *testing.T) { } } +// TestStreamReaderDialDetectUnsupport tests that dial func could find +// out that the stream type is not supported by the remote. +func TestStreamReaderDialDetectUnsupport(t *testing.T) { + for i, typ := range []streamType{streamTypeMsgAppV2, streamTypeMessage} { + // the response from etcd 2.0 + tr := &respRoundTripper{ + code: http.StatusNotFound, + header: http.Header{}, + } + sr := &streamReader{ + tr: tr, + picker: mustNewURLPicker(t, []string{"http://localhost:2380"}), + from: types.ID(1), + to: types.ID(2), + cid: types.ID(1), + } + + _, err := sr.dial(typ) + if err != errUnsupportedStreamType { + t.Errorf("#%d: error = %v, want %v", i, err, errUnsupportedStreamType) + } + } +} + // TestStream tests that streamReader and streamWriter can build stream to // send messages between each other. func TestStream(t *testing.T) { @@ -272,6 +302,114 @@ func TestStream(t *testing.T) { } } +func TestServerVersion(t *testing.T) { + tests := []struct { + h http.Header + wv *semver.Version + }{ + // backward compatibility with etcd 2.0 + { + http.Header{}, + semver.Must(semver.NewVersion("2.0.0")), + }, + { + http.Header{"X-Server-Version": []string{"2.1.0"}}, + semver.Must(semver.NewVersion("2.1.0")), + }, + { + http.Header{"X-Server-Version": []string{"2.1.0-alpha.0+git"}}, + semver.Must(semver.NewVersion("2.1.0-alpha.0+git")), + }, + } + for i, tt := range tests { + v := serverVersion(tt.h) + if v.String() != tt.wv.String() { + t.Errorf("#%d: version = %s, want %s", i, v, tt.wv) + } + } +} + +func TestCompareMajorMinorVersion(t *testing.T) { + tests := []struct { + va, vb *semver.Version + w int + }{ + // equal to + { + semver.Must(semver.NewVersion("2.1.0")), + semver.Must(semver.NewVersion("2.1.0")), + 0, + }, + // smaller than + { + semver.Must(semver.NewVersion("2.0.0")), + semver.Must(semver.NewVersion("2.1.0")), + -1, + }, + // bigger than + { + semver.Must(semver.NewVersion("2.2.0")), + semver.Must(semver.NewVersion("2.1.0")), + 1, + }, + // ignore patch + { + semver.Must(semver.NewVersion("2.1.1")), + semver.Must(semver.NewVersion("2.1.0")), + 0, + }, + // ignore prerelease + { + semver.Must(semver.NewVersion("2.1.0-alpha.0")), + semver.Must(semver.NewVersion("2.1.0")), + 0, + }, + } + for i, tt := range tests { + if g := compareMajorMinorVersion(tt.va, tt.vb); g != tt.w { + t.Errorf("#%d: compare = %d, want %d", i, g, tt.w) + } + } +} + +func TestCheckStreamSupport(t *testing.T) { + tests := []struct { + v *semver.Version + t streamType + w bool + }{ + // support + { + semver.Must(semver.NewVersion("2.0.0")), + streamTypeMsgApp, + true, + }, + // ignore patch + { + semver.Must(semver.NewVersion("2.0.9")), + streamTypeMsgApp, + true, + }, + // ignore prerelease + { + semver.Must(semver.NewVersion("2.0.0-alpha")), + streamTypeMsgApp, + true, + }, + // not support + { + semver.Must(semver.NewVersion("2.0.0")), + streamTypeMsgAppV2, + false, + }, + } + for i, tt := range tests { + if g := checkStreamSupport(tt.v, tt.t); g != tt.w { + t.Errorf("#%d: check = %v, want %v", i, g, tt.w) + } + } +} + type fakeWriteFlushCloser struct { err error written int @@ -294,6 +432,7 @@ type fakeStreamHandler struct { } func (h *fakeStreamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + w.Header().Add("X-Server-Version", version.Version) w.(http.Flusher).Flush() c := newCloseNotifier() h.sw.attach(&outgoingConn{