From f725f6a5527b1aa5a36f1f4cc3417d6d9f37164c Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Mon, 19 Oct 2015 23:41:01 -0700 Subject: [PATCH 1/4] rafthttp: deprecate streamTypeMsgApp streamTypeMsgApp is only used in etcd 2.0. etcd 2.3 should not talk to etcd 2.0, either send or receive requests. So I deprecate streamTypeMsgApp and its related stuffs from rafthttp package. updating term is only used from streamTypeMsgApp, so it is removed too. --- rafthttp/functional_test.go | 1 - rafthttp/http.go | 8 +-- rafthttp/http_test.go | 12 ---- rafthttp/msgapp_codec.go | 98 -------------------------------- rafthttp/msgapp_codec_test.go | 70 ----------------------- rafthttp/peer.go | 14 ++--- rafthttp/stream.go | 103 ++++++++-------------------------- rafthttp/stream_test.go | 88 ++++++----------------------- rafthttp/transport.go | 23 +------- rafthttp/transport_test.go | 6 -- 10 files changed, 46 insertions(+), 377 deletions(-) delete mode 100644 rafthttp/msgapp_codec.go delete mode 100644 rafthttp/msgapp_codec_test.go diff --git a/rafthttp/functional_test.go b/rafthttp/functional_test.go index 542c281f7..50ae4c061 100644 --- a/rafthttp/functional_test.go +++ b/rafthttp/functional_test.go @@ -66,7 +66,6 @@ func TestSendMessage(t *testing.T) { tests := []raftpb.Message{ // these messages are set to send to itself, which facilitates testing. {Type: raftpb.MsgProp, From: 1, To: 2, Entries: []raftpb.Entry{{Data: data}}}, - // TODO: send out MsgApp which fits msgapp stream but the term doesn't match {Type: raftpb.MsgApp, From: 1, To: 2, Term: 1, Index: 3, LogTerm: 0, Entries: []raftpb.Entry{{Index: 4, Term: 1, Data: data}}, Commit: 3}, {Type: raftpb.MsgAppResp, From: 1, To: 2, Term: 1, Index: 3}, {Type: raftpb.MsgVote, From: 1, To: 2, Term: 1, Index: 3, LogTerm: 0}, diff --git a/rafthttp/http.go b/rafthttp/http.go index 8547dce08..ae14d4202 100644 --- a/rafthttp/http.go +++ b/rafthttp/http.go @@ -228,12 +228,9 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { var t streamType switch path.Dir(r.URL.Path) { - // backward compatibility - case RaftStreamPrefix: - t = streamTypeMsgApp - case path.Join(RaftStreamPrefix, string(streamTypeMsgApp)): + case streamTypeMsgAppV2.endpoint(): t = streamTypeMsgAppV2 - case path.Join(RaftStreamPrefix, string(streamTypeMessage)): + case streamTypeMessage.endpoint(): t = streamTypeMessage default: plog.Debugf("ignored unexpected streaming request path %s", r.URL.Path) @@ -278,7 +275,6 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { c := newCloseNotifier() conn := &outgoingConn{ t: t, - termStr: r.Header.Get("X-Raft-Term"), Writer: w, Flusher: w.(http.Flusher), Closer: c, diff --git a/rafthttp/http_test.go b/rafthttp/http_test.go index 0f94d4206..c4ef9672a 100644 --- a/rafthttp/http_test.go +++ b/rafthttp/http_test.go @@ -170,11 +170,6 @@ func TestServeRaftStreamPrefix(t *testing.T) { RaftStreamPrefix + "/msgapp/1", streamTypeMsgAppV2, }, - // backward compatibility - { - RaftStreamPrefix + "/1", - streamTypeMsgApp, - }, } for i, tt := range tests { req, err := http.NewRequest("GET", "http://localhost:2380"+tt.path, nil) @@ -184,8 +179,6 @@ func TestServeRaftStreamPrefix(t *testing.T) { req.Header.Set("X-Etcd-Cluster-ID", "1") req.Header.Set("X-Server-Version", version.Version) req.Header.Set("X-Raft-To", "2") - wterm := "1" - req.Header.Set("X-Raft-Term", wterm) peer := newFakePeer() peerGetter := &fakePeerGetter{peers: map[types.ID]Peer{types.ID(1): peer}} @@ -206,9 +199,6 @@ func TestServeRaftStreamPrefix(t *testing.T) { if conn.t != tt.wtype { t.Errorf("#%d: type = %s, want %s", i, conn.t, tt.wtype) } - if conn.termStr != wterm { - t.Errorf("#%d: term = %s, want %s", i, conn.termStr, wterm) - } conn.Close() } } @@ -352,7 +342,6 @@ func (pg *fakePeerGetter) Get(id types.ID) Peer { return pg.peers[id] } type fakePeer struct { msgs []raftpb.Message urls types.URLs - term uint64 connc chan *outgoingConn } @@ -364,7 +353,6 @@ func newFakePeer() *fakePeer { func (pr *fakePeer) send(m raftpb.Message) { pr.msgs = append(pr.msgs, m) } func (pr *fakePeer) update(urls types.URLs) { pr.urls = urls } -func (pr *fakePeer) setTerm(term uint64) { pr.term = term } func (pr *fakePeer) attachOutgoingConn(conn *outgoingConn) { pr.connc <- conn } func (pr *fakePeer) activeSince() time.Time { return time.Time{} } func (pr *fakePeer) stop() {} diff --git a/rafthttp/msgapp_codec.go b/rafthttp/msgapp_codec.go deleted file mode 100644 index 895dde2ce..000000000 --- a/rafthttp/msgapp_codec.go +++ /dev/null @@ -1,98 +0,0 @@ -// Copyright 2015 CoreOS, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package rafthttp - -import ( - "encoding/binary" - "io" - "time" - - "github.com/coreos/etcd/etcdserver/stats" - "github.com/coreos/etcd/pkg/types" - "github.com/coreos/etcd/raft/raftpb" -) - -// msgAppEncoder is a optimized encoder for append messages. It assumes -// that the decoder has enough information to recover the fields except -// Entries, and it writes only Entries into the Writer. -// It MUST be used with a paired msgAppDecoder. -type msgAppEncoder struct { - w io.Writer - // TODO: move the fs stats and use new metrics - fs *stats.FollowerStats -} - -func (enc *msgAppEncoder) encode(m raftpb.Message) error { - if isLinkHeartbeatMessage(m) { - return binary.Write(enc.w, binary.BigEndian, uint64(0)) - } - - start := time.Now() - ents := m.Entries - l := len(ents) - // There is no need to send empty ents, and it avoids confusion with - // heartbeat. - if l == 0 { - return nil - } - if err := binary.Write(enc.w, binary.BigEndian, uint64(l)); err != nil { - return err - } - for i := 0; i < l; i++ { - ent := &ents[i] - if err := writeEntryTo(enc.w, ent); err != nil { - return err - } - } - enc.fs.Succ(time.Since(start)) - return nil -} - -// msgAppDecoder is a optimized decoder for append messages. It reads data -// from the Reader and parses it into Entries, then builds messages. -type msgAppDecoder struct { - r io.Reader - local, remote types.ID - term uint64 -} - -func (dec *msgAppDecoder) decode() (raftpb.Message, error) { - var m raftpb.Message - var l uint64 - if err := binary.Read(dec.r, binary.BigEndian, &l); err != nil { - return m, err - } - if l == 0 { - return linkHeartbeatMessage, nil - } - ents := make([]raftpb.Entry, int(l)) - for i := 0; i < int(l); i++ { - ent := &ents[i] - if err := readEntryFrom(dec.r, ent); err != nil { - return m, err - } - } - - m = raftpb.Message{ - Type: raftpb.MsgApp, - From: uint64(dec.remote), - To: uint64(dec.local), - Term: dec.term, - LogTerm: dec.term, - Index: ents[0].Index - 1, - Entries: ents, - } - return m, nil -} diff --git a/rafthttp/msgapp_codec_test.go b/rafthttp/msgapp_codec_test.go deleted file mode 100644 index 1680bc8db..000000000 --- a/rafthttp/msgapp_codec_test.go +++ /dev/null @@ -1,70 +0,0 @@ -// Copyright 2015 CoreOS, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package rafthttp - -import ( - "bytes" - "reflect" - "testing" - - "github.com/coreos/etcd/etcdserver/stats" - "github.com/coreos/etcd/pkg/types" - "github.com/coreos/etcd/raft/raftpb" -) - -func TestMsgApp(t *testing.T) { - tests := []raftpb.Message{ - { - Type: raftpb.MsgApp, - From: 1, - To: 2, - Term: 1, - LogTerm: 1, - Index: 3, - Entries: []raftpb.Entry{{Term: 1, Index: 4}}, - }, - { - Type: raftpb.MsgApp, - From: 1, - To: 2, - Term: 1, - LogTerm: 1, - Index: 0, - Entries: []raftpb.Entry{ - {Term: 1, Index: 1, Data: []byte("some data")}, - {Term: 1, Index: 2, Data: []byte("some data")}, - {Term: 1, Index: 3, Data: []byte("some data")}, - }, - }, - linkHeartbeatMessage, - } - for i, tt := range tests { - b := &bytes.Buffer{} - enc := &msgAppEncoder{w: b, fs: &stats.FollowerStats{}} - if err := enc.encode(tt); err != nil { - t.Errorf("#%d: unexpected encode message error: %v", i, err) - continue - } - dec := &msgAppDecoder{r: b, local: types.ID(tt.To), remote: types.ID(tt.From), term: tt.Term} - m, err := dec.decode() - if err != nil { - t.Errorf("#%d: unexpected decode message error: %v", i, err) - continue - } - if !reflect.DeepEqual(m, tt) { - t.Errorf("#%d: message = %+v, want %+v", i, m, tt) - } - } -} diff --git a/rafthttp/peer.go b/rafthttp/peer.go index fd53e97d8..5294e938a 100644 --- a/rafthttp/peer.go +++ b/rafthttp/peer.go @@ -60,8 +60,6 @@ type Peer interface { send(m raftpb.Message) // update updates the urls of remote peer. update(urls types.URLs) - // setTerm sets the term of ongoing communication. - setTerm(term uint64) // attachOutgoingConn attachs the outgoing connection to the peer for // stream usage. After the call, the ownership of the outgoing // connection hands over to the peer. The peer will close the connection @@ -104,7 +102,6 @@ type peer struct { recvc chan raftpb.Message propc chan raftpb.Message newURLsC chan types.URLs - termc chan uint64 // for testing pausec chan struct{} @@ -114,7 +111,7 @@ type peer struct { done chan struct{} } -func startPeer(streamRt, pipelineRt http.RoundTripper, urls types.URLs, local, to, cid types.ID, snapst *snapshotStore, r Raft, fs *stats.FollowerStats, errorc chan error, term uint64, v3demo bool) *peer { +func startPeer(streamRt, pipelineRt http.RoundTripper, urls types.URLs, local, to, cid types.ID, snapst *snapshotStore, r Raft, fs *stats.FollowerStats, errorc chan error, v3demo bool) *peer { picker := newURLPicker(urls) status := newPeerStatus(to) p := &peer{ @@ -130,7 +127,6 @@ func startPeer(streamRt, pipelineRt http.RoundTripper, urls types.URLs, local, t recvc: make(chan raftpb.Message, recvBufSize), propc: make(chan raftpb.Message, maxPendingProposals), newURLsC: make(chan types.URLs), - termc: make(chan uint64), pausec: make(chan struct{}), resumec: make(chan struct{}), stopc: make(chan struct{}), @@ -153,8 +149,8 @@ func startPeer(streamRt, pipelineRt http.RoundTripper, urls types.URLs, local, t } }() - p.msgAppReader = startStreamReader(streamRt, picker, streamTypeMsgAppV2, local, to, cid, status, p.recvc, p.propc, errorc, term) - reader := startStreamReader(streamRt, picker, streamTypeMessage, local, to, cid, status, p.recvc, p.propc, errorc, term) + p.msgAppReader = startStreamReader(streamRt, picker, streamTypeMsgAppV2, local, to, cid, status, p.recvc, p.propc, errorc) + reader := startStreamReader(streamRt, picker, streamTypeMessage, local, to, cid, status, p.recvc, p.propc, errorc) go func() { var paused bool for { @@ -222,12 +218,10 @@ func (p *peer) update(urls types.URLs) { } } -func (p *peer) setTerm(term uint64) { p.msgAppReader.updateMsgAppTerm(term) } - func (p *peer) attachOutgoingConn(conn *outgoingConn) { var ok bool switch conn.t { - case streamTypeMsgApp, streamTypeMsgAppV2: + case streamTypeMsgAppV2: ok = p.msgAppWriter.attach(conn) case streamTypeMessage: ok = p.writer.attach(conn) diff --git a/rafthttp/stream.go b/rafthttp/stream.go index c8db968c2..5a9f9e0ec 100644 --- a/rafthttp/stream.go +++ b/rafthttp/stream.go @@ -21,7 +21,6 @@ import ( "net" "net/http" "path" - "strconv" "strings" "sync" "time" @@ -37,7 +36,6 @@ import ( const ( streamTypeMessage streamType = "message" streamTypeMsgAppV2 streamType = "msgappv2" - streamTypeMsgApp streamType = "msgapp" streamBufSize = 4096 ) @@ -47,9 +45,9 @@ var ( // the key is in string format "major.minor.patch" supportedStream = map[string][]streamType{ - "2.0.0": {streamTypeMsgApp}, - "2.1.0": {streamTypeMsgApp, streamTypeMsgAppV2, streamTypeMessage}, - "2.2.0": {streamTypeMsgApp, streamTypeMsgAppV2, streamTypeMessage}, + "2.0.0": {}, + "2.1.0": {streamTypeMsgAppV2, streamTypeMessage}, + "2.2.0": {streamTypeMsgAppV2, streamTypeMessage}, } ) @@ -57,8 +55,6 @@ type streamType string func (t streamType) endpoint() string { switch t { - case streamTypeMsgApp: // for backward compatibility of v2.0 - return RaftStreamPrefix case streamTypeMsgAppV2: return path.Join(RaftStreamPrefix, "msgapp") case streamTypeMessage: @@ -71,8 +67,6 @@ func (t streamType) endpoint() string { func (t streamType) String() string { switch t { - case streamTypeMsgApp: - return "stream MsgApp" case streamTypeMsgAppV2: return "stream MsgApp v2" case streamTypeMessage: @@ -94,8 +88,7 @@ func isLinkHeartbeatMessage(m raftpb.Message) bool { } type outgoingConn struct { - t streamType - termStr string + t streamType io.Writer http.Flusher io.Closer @@ -138,7 +131,6 @@ func (cw *streamWriter) run() { var msgc chan raftpb.Message var heartbeatc <-chan time.Time var t streamType - var msgAppTerm uint64 var enc encoder var flusher http.Flusher tickc := time.Tick(ConnReadTimeout / 3) @@ -158,16 +150,6 @@ func (cw *streamWriter) run() { flusher.Flush() reportSentDuration(string(t), linkHeartbeatMessage, time.Since(start)) case m := <-msgc: - if t == streamTypeMsgApp && m.Term != msgAppTerm { - // TODO: reasonable retry logic - if m.Term > msgAppTerm { - cw.close() - heartbeatc, msgc = nil, nil - // TODO: report to raft at peer level - cw.r.ReportUnreachable(m.To) - } - continue - } start := time.Now() if err := enc.encode(m); err != nil { reportSentFailure(string(t), m) @@ -184,13 +166,6 @@ func (cw *streamWriter) run() { cw.close() t = conn.t switch conn.t { - case streamTypeMsgApp: - var err error - msgAppTerm, err = strconv.ParseUint(conn.termStr, 10, 64) - if err != nil { - plog.Panicf("could not parse term %s to uint (%v)", conn.termStr, err) - } - enc = &msgAppEncoder{w: conn.Writer, fs: cw.fs} case streamTypeMsgAppV2: enc = newMsgAppV2Encoder(conn.Writer, cw.fs) case streamTypeMessage: @@ -260,29 +235,27 @@ type streamReader struct { propc chan<- raftpb.Message errorc chan<- error - mu sync.Mutex - msgAppTerm uint64 - cancel func() - closer io.Closer - stopc chan struct{} - done chan struct{} + mu sync.Mutex + cancel func() + closer io.Closer + stopc chan struct{} + done chan struct{} } -func startStreamReader(tr http.RoundTripper, picker *urlPicker, t streamType, local, remote, cid types.ID, status *peerStatus, recvc chan<- raftpb.Message, propc chan<- raftpb.Message, errorc chan<- error, term uint64) *streamReader { +func startStreamReader(tr http.RoundTripper, picker *urlPicker, t streamType, local, remote, cid types.ID, status *peerStatus, recvc chan<- raftpb.Message, propc chan<- raftpb.Message, errorc chan<- error) *streamReader { r := &streamReader{ - tr: tr, - picker: picker, - t: t, - local: local, - remote: remote, - cid: cid, - status: status, - recvc: recvc, - propc: propc, - errorc: errorc, - msgAppTerm: term, - stopc: make(chan struct{}), - done: make(chan struct{}), + tr: tr, + picker: picker, + t: t, + local: local, + remote: remote, + cid: cid, + status: status, + recvc: recvc, + propc: propc, + errorc: errorc, + stopc: make(chan struct{}), + done: make(chan struct{}), } go r.run() return r @@ -292,12 +265,6 @@ func (cr *streamReader) run() { for { t := cr.t rc, err := cr.dial(t) - // downgrade to streamTypeMsgApp if the remote doesn't support - // streamTypeMsgAppV2 - if t == streamTypeMsgAppV2 && err == errUnsupportedStreamType { - t = streamTypeMsgApp - rc, err = cr.dial(t) - } if err != nil { if err != errUnsupportedStreamType { cr.status.deactivate(failureType{source: t.String(), action: "dial"}, err.Error()) @@ -310,9 +277,6 @@ func (cr *streamReader) run() { case err == io.EOF: // connection is closed by the remote case isClosedConnectionError(err): - // stream msgapp is only used for etcd 2.0, and etcd 2.0 doesn't - // heartbeat on the idle stream, so it is expected to time out. - case t == streamTypeMsgApp && isNetworkTimeoutError(err): default: cr.status.deactivate(failureType{source: t.String(), action: "read"}, err.Error()) } @@ -332,8 +296,6 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error { var dec decoder cr.mu.Lock() switch t { - case streamTypeMsgApp: - dec = &msgAppDecoder{r: rc, local: cr.local, remote: cr.remote, term: cr.msgAppTerm} case streamTypeMsgAppV2: dec = newMsgAppV2Decoder(rc, cr.local, cr.remote) case streamTypeMessage: @@ -377,20 +339,6 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error { } } -// updateMsgAppTerm updates the term for MsgApp stream, and closes -// the existing MsgApp stream if term is updated. -func (cr *streamReader) updateMsgAppTerm(term uint64) { - cr.mu.Lock() - defer cr.mu.Unlock() - if cr.msgAppTerm >= term { - return - } - cr.msgAppTerm = term - if cr.t == streamTypeMsgApp { - cr.close() - } -} - func (cr *streamReader) stop() { close(cr.stopc) cr.mu.Lock() @@ -410,10 +358,6 @@ func (cr *streamReader) isWorking() bool { func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) { u := cr.picker.pick() - cr.mu.Lock() - term := cr.msgAppTerm - cr.mu.Unlock() - uu := u uu.Path = path.Join(t.endpoint(), cr.local.String()) @@ -427,9 +371,6 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) { req.Header.Set("X-Min-Cluster-Version", version.MinClusterVersion) req.Header.Set("X-Etcd-Cluster-ID", cr.cid.String()) req.Header.Set("X-Raft-To", cr.remote.String()) - if t == streamTypeMsgApp { - req.Header.Set("X-Raft-Term", strconv.FormatUint(term, 10)) - } cr.mu.Lock() select { diff --git a/rafthttp/stream_test.go b/rafthttp/stream_test.go index c4058b8fc..3dfa4ced4 100644 --- a/rafthttp/stream_test.go +++ b/rafthttp/stream_test.go @@ -98,15 +98,14 @@ func TestStreamWriterAttachBadOutgoingConn(t *testing.T) { } func TestStreamReaderDialRequest(t *testing.T) { - for i, tt := range []streamType{streamTypeMsgApp, streamTypeMessage, streamTypeMsgAppV2} { + for i, tt := range []streamType{streamTypeMessage, streamTypeMsgAppV2} { tr := &roundTripperRecorder{} sr := &streamReader{ - tr: tr, - picker: mustNewURLPicker(t, []string{"http://localhost:2380"}), - local: types.ID(1), - remote: types.ID(2), - cid: types.ID(1), - msgAppTerm: 1, + tr: tr, + picker: mustNewURLPicker(t, []string{"http://localhost:2380"}), + local: types.ID(1), + remote: types.ID(2), + cid: types.ID(1), } sr.dial(tt) @@ -124,9 +123,6 @@ func TestStreamReaderDialRequest(t *testing.T) { if g := req.Header.Get("X-Raft-To"); g != "2" { t.Errorf("#%d: header X-Raft-To = %s, want 2", i, g) } - if g := req.Header.Get("X-Raft-Term"); tt == streamTypeMsgApp && g != "1" { - t.Errorf("#%d: header X-Raft-Term = %s, want 1", i, g) - } } } @@ -173,41 +169,6 @@ func TestStreamReaderDialResult(t *testing.T) { } } -func TestStreamReaderUpdateMsgAppTerm(t *testing.T) { - term := uint64(2) - tests := []struct { - term uint64 - typ streamType - wterm uint64 - wclose bool - }{ - // lower term - {1, streamTypeMsgApp, 2, false}, - // unchanged term - {2, streamTypeMsgApp, 2, false}, - // higher term - {3, streamTypeMessage, 3, false}, - {3, streamTypeMsgAppV2, 3, false}, - // higher term, reset closer - {3, streamTypeMsgApp, 3, true}, - } - for i, tt := range tests { - closer := &fakeWriteFlushCloser{} - cr := &streamReader{ - msgAppTerm: term, - t: tt.typ, - closer: closer, - } - cr.updateMsgAppTerm(tt.term) - if cr.msgAppTerm != tt.wterm { - t.Errorf("#%d: term = %d, want %d", i, cr.msgAppTerm, tt.wterm) - } - if closer.closed != tt.wclose { - t.Errorf("#%d: closed = %v, want %v", i, closer.closed, tt.wclose) - } - } -} - // TestStreamReaderDialDetectUnsupport tests that dial func could find // out that the stream type is not supported by the remote. func TestStreamReaderDialDetectUnsupport(t *testing.T) { @@ -248,32 +209,22 @@ func TestStream(t *testing.T) { } tests := []struct { - t streamType - term uint64 - m raftpb.Message - wc chan raftpb.Message + t streamType + m raftpb.Message + wc chan raftpb.Message }{ { streamTypeMessage, - 0, raftpb.Message{Type: raftpb.MsgProp, To: 2}, propc, }, { streamTypeMessage, - 0, - msgapp, - recvc, - }, - { - streamTypeMsgApp, - 1, msgapp, recvc, }, { streamTypeMsgAppV2, - 0, msgapp, recvc, }, @@ -288,7 +239,7 @@ func TestStream(t *testing.T) { h.sw = sw picker := mustNewURLPicker(t, []string{srv.URL}) - sr := startStreamReader(&http.Transport{}, picker, tt.t, types.ID(1), types.ID(2), types.ID(1), newPeerStatus(types.ID(1)), recvc, propc, nil, tt.term) + sr := startStreamReader(&http.Transport{}, picker, tt.t, types.ID(1), types.ID(2), types.ID(1), newPeerStatus(types.ID(1)), recvc, propc, nil) defer sr.stop() // wait for stream to work var writec chan<- raftpb.Message @@ -321,27 +272,21 @@ func TestCheckStreamSupport(t *testing.T) { }{ // support { - semver.Must(semver.NewVersion("2.0.0")), - streamTypeMsgApp, + semver.Must(semver.NewVersion("2.1.0")), + streamTypeMsgAppV2, true, }, // ignore patch { - semver.Must(semver.NewVersion("2.0.9")), - streamTypeMsgApp, + semver.Must(semver.NewVersion("2.1.9")), + streamTypeMsgAppV2, true, }, // ignore prerelease { - semver.Must(semver.NewVersion("2.0.0-alpha")), - streamTypeMsgApp, - true, - }, - // not support - { - semver.Must(semver.NewVersion("2.0.0")), + semver.Must(semver.NewVersion("2.1.0-alpha")), streamTypeMsgAppV2, - false, + true, }, } for i, tt := range tests { @@ -378,7 +323,6 @@ func (h *fakeStreamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { c := newCloseNotifier() h.sw.attach(&outgoingConn{ t: h.t, - termStr: r.Header.Get("X-Raft-Term"), Writer: w, Flusher: w.(http.Flusher), Closer: c, diff --git a/rafthttp/transport.go b/rafthttp/transport.go index 0a88d5da3..e9fb79e7d 100644 --- a/rafthttp/transport.go +++ b/rafthttp/transport.go @@ -121,8 +121,7 @@ type Transport struct { streamRt http.RoundTripper // roundTripper used by streams pipelineRt http.RoundTripper // roundTripper used by pipelines - mu sync.RWMutex // protect the term, remote and peer map - term uint64 // the latest term that has been observed + mu sync.RWMutex // protect the remote and peer map remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up peers map[types.ID]Peer // peers map @@ -169,18 +168,6 @@ func (t *Transport) Get(id types.ID) Peer { return t.peers[id] } -func (t *Transport) maybeUpdatePeersTerm(term uint64) { - t.mu.Lock() - defer t.mu.Unlock() - if t.term >= term { - return - } - t.term = term - for _, p := range t.peers { - p.setTerm(term) - } -} - func (t *Transport) Send(msgs []raftpb.Message) { for _, m := range msgs { if m.To == 0 { @@ -189,12 +176,6 @@ func (t *Transport) Send(msgs []raftpb.Message) { } to := types.ID(m.To) - // update terms for all the peers - // ignore MsgProp since it does not have a valid term - if m.Type != raftpb.MsgProp { - t.maybeUpdatePeersTerm(m.Term) - } - p, ok := t.peers[to] if ok { if m.Type == raftpb.MsgApp { @@ -254,7 +235,7 @@ func (t *Transport) AddPeer(id types.ID, us []string) { plog.Panicf("newURLs %+v should never fail: %+v", us, err) } fs := t.LeaderStats.Follower(id.String()) - t.peers[id] = startPeer(t.streamRt, t.pipelineRt, urls, t.ID, id, t.ClusterID, t.snapst, t.Raft, fs, t.ErrorC, t.term, t.V3demo) + t.peers[id] = startPeer(t.streamRt, t.pipelineRt, urls, t.ID, id, t.ClusterID, t.snapst, t.Raft, fs, t.ErrorC, t.V3demo) addPeerToProber(t.prober, id.String(), us) } diff --git a/rafthttp/transport_test.go b/rafthttp/transport_test.go index df45ab4d5..43a46a925 100644 --- a/rafthttp/transport_test.go +++ b/rafthttp/transport_test.go @@ -68,11 +68,9 @@ func TestTransportSend(t *testing.T) { func TestTransportAdd(t *testing.T) { ls := stats.NewLeaderStats("") - term := uint64(10) tr := &Transport{ LeaderStats: ls, streamRt: &roundTripperRecorder{}, - term: term, peers: make(map[types.ID]Peer), prober: probing.NewProber(nil), } @@ -95,10 +93,6 @@ func TestTransportAdd(t *testing.T) { } tr.Stop() - - if g := s.(*peer).msgAppReader.msgAppTerm; g != term { - t.Errorf("peer.term = %d, want %d", g, term) - } } func TestTransportRemove(t *testing.T) { From 33231fccdd03337a56e03e87cc4a3a7e561e6cb4 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Tue, 20 Oct 2015 08:17:06 -0700 Subject: [PATCH 2/4] rafthttp: fix wrong stream name returned by pick msgAppWriter uses streamAppV2 type, and it should return the correct name. --- rafthttp/peer.go | 3 +-- rafthttp/peer_test.go | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/rafthttp/peer.go b/rafthttp/peer.go index 5294e938a..154bc6305 100644 --- a/rafthttp/peer.go +++ b/rafthttp/peer.go @@ -45,7 +45,6 @@ const ( // to hold all proposals. maxPendingProposals = 4096 - streamApp = "streamMsgApp" streamAppV2 = "streamMsgAppV2" streamMsg = "streamMsg" pipelineMsg = "pipeline" @@ -266,7 +265,7 @@ func (p *peer) pick(m raftpb.Message) (writec chan<- raftpb.Message, picked stri if isMsgSnap(m) { return p.pipeline.msgc, pipelineMsg } else if writec, ok = p.msgAppWriter.writec(); ok && canUseMsgAppStream(m) { - return writec, streamApp + return writec, streamAppV2 } else if writec, ok = p.writer.writec(); ok { return writec, streamMsg } diff --git a/rafthttp/peer_test.go b/rafthttp/peer_test.go index b35d50c8e..0ad2f82b3 100644 --- a/rafthttp/peer_test.go +++ b/rafthttp/peer_test.go @@ -35,7 +35,7 @@ func TestPeerPick(t *testing.T) { { true, true, raftpb.Message{Type: raftpb.MsgApp, Term: 1, LogTerm: 1}, - streamApp, + streamAppV2, }, { true, true, From 5060b2f322a4ed31ed26534b403b0e30416531b4 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Tue, 20 Oct 2015 00:02:59 -0700 Subject: [PATCH 3/4] rafthttp: send all MsgApp on stream msgAppV2 For stream msgAppV2, as long as the message is MsgApp type, it should be sent through stream msgAppV2. --- rafthttp/peer.go | 4 +++- rafthttp/stream.go | 4 ---- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/rafthttp/peer.go b/rafthttp/peer.go index 154bc6305..5b70228df 100644 --- a/rafthttp/peer.go +++ b/rafthttp/peer.go @@ -264,7 +264,7 @@ func (p *peer) pick(m raftpb.Message) (writec chan<- raftpb.Message, picked stri // stream for a long time, only use one of the N pipelines to send MsgSnap. if isMsgSnap(m) { return p.pipeline.msgc, pipelineMsg - } else if writec, ok = p.msgAppWriter.writec(); ok && canUseMsgAppStream(m) { + } else if writec, ok = p.msgAppWriter.writec(); ok && isMsgApp(m) { return writec, streamAppV2 } else if writec, ok = p.writer.writec(); ok { return writec, streamMsg @@ -272,4 +272,6 @@ func (p *peer) pick(m raftpb.Message) (writec chan<- raftpb.Message, picked stri return p.pipeline.msgc, pipelineMsg } +func isMsgApp(m raftpb.Message) bool { return m.Type == raftpb.MsgApp } + func isMsgSnap(m raftpb.Message) bool { return m.Type == raftpb.MsgSnap } diff --git a/rafthttp/stream.go b/rafthttp/stream.go index 5a9f9e0ec..566806cc7 100644 --- a/rafthttp/stream.go +++ b/rafthttp/stream.go @@ -441,10 +441,6 @@ func (cr *streamReader) close() { cr.closer = nil } -func canUseMsgAppStream(m raftpb.Message) bool { - return m.Type == raftpb.MsgApp && m.Term == m.LogTerm -} - func isClosedConnectionError(err error) bool { operr, ok := err.(*net.OpError) return ok && operr.Err.Error() == "use of closed network connection" From b61eaf33356f7485f4b65d3a4aea8ac5d38ccd72 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Tue, 20 Oct 2015 08:28:06 -0700 Subject: [PATCH 4/4] rafthttp: msgApp{Reader/Writer} -> msgAppV2{Reader/Writer} To make what it serves more clear. --- rafthttp/functional_test.go | 2 +- rafthttp/peer.go | 52 ++++++++++++++++++------------------- rafthttp/peer_test.go | 6 ++--- 3 files changed, 30 insertions(+), 30 deletions(-) diff --git a/rafthttp/functional_test.go b/rafthttp/functional_test.go index 50ae4c061..e07c0ba73 100644 --- a/rafthttp/functional_test.go +++ b/rafthttp/functional_test.go @@ -148,7 +148,7 @@ func newServerStats() *stats.ServerStats { func waitStreamWorking(p *peer) bool { for i := 0; i < 1000; i++ { time.Sleep(time.Millisecond) - if _, ok := p.msgAppWriter.writec(); !ok { + if _, ok := p.msgAppV2Writer.writec(); !ok { continue } if _, ok := p.writer.writec(); !ok { diff --git a/rafthttp/peer.go b/rafthttp/peer.go index 5b70228df..8a95d5031 100644 --- a/rafthttp/peer.go +++ b/rafthttp/peer.go @@ -91,11 +91,11 @@ type peer struct { status *peerStatus - msgAppWriter *streamWriter - writer *streamWriter - pipeline *pipeline - snapSender *snapshotSender // snapshot sender to send v3 snapshot messages - msgAppReader *streamReader + msgAppV2Writer *streamWriter + writer *streamWriter + pipeline *pipeline + snapSender *snapshotSender // snapshot sender to send v3 snapshot messages + msgAppV2Reader *streamReader sendc chan raftpb.Message recvc chan raftpb.Message @@ -114,22 +114,22 @@ func startPeer(streamRt, pipelineRt http.RoundTripper, urls types.URLs, local, t picker := newURLPicker(urls) status := newPeerStatus(to) p := &peer{ - id: to, - r: r, - v3demo: v3demo, - status: status, - msgAppWriter: startStreamWriter(to, status, fs, r), - writer: startStreamWriter(to, status, fs, r), - pipeline: newPipeline(pipelineRt, picker, local, to, cid, status, fs, r, errorc), - snapSender: newSnapshotSender(pipelineRt, picker, local, to, cid, status, snapst, r, errorc), - sendc: make(chan raftpb.Message), - recvc: make(chan raftpb.Message, recvBufSize), - propc: make(chan raftpb.Message, maxPendingProposals), - newURLsC: make(chan types.URLs), - pausec: make(chan struct{}), - resumec: make(chan struct{}), - stopc: make(chan struct{}), - done: make(chan struct{}), + id: to, + r: r, + v3demo: v3demo, + status: status, + msgAppV2Writer: startStreamWriter(to, status, fs, r), + writer: startStreamWriter(to, status, fs, r), + pipeline: newPipeline(pipelineRt, picker, local, to, cid, status, fs, r, errorc), + snapSender: newSnapshotSender(pipelineRt, picker, local, to, cid, status, snapst, r, errorc), + sendc: make(chan raftpb.Message), + recvc: make(chan raftpb.Message, recvBufSize), + propc: make(chan raftpb.Message, maxPendingProposals), + newURLsC: make(chan types.URLs), + pausec: make(chan struct{}), + resumec: make(chan struct{}), + stopc: make(chan struct{}), + done: make(chan struct{}), } // Use go-routine for process of MsgProp because it is @@ -148,7 +148,7 @@ func startPeer(streamRt, pipelineRt http.RoundTripper, urls types.URLs, local, t } }() - p.msgAppReader = startStreamReader(streamRt, picker, streamTypeMsgAppV2, local, to, cid, status, p.recvc, p.propc, errorc) + p.msgAppV2Reader = startStreamReader(streamRt, picker, streamTypeMsgAppV2, local, to, cid, status, p.recvc, p.propc, errorc) reader := startStreamReader(streamRt, picker, streamTypeMessage, local, to, cid, status, p.recvc, p.propc, errorc) go func() { var paused bool @@ -188,11 +188,11 @@ func startPeer(streamRt, pipelineRt http.RoundTripper, urls types.URLs, local, t paused = false case <-p.stopc: cancel() - p.msgAppWriter.stop() + p.msgAppV2Writer.stop() p.writer.stop() p.pipeline.stop() p.snapSender.stop() - p.msgAppReader.stop() + p.msgAppV2Reader.stop() reader.stop() close(p.done) return @@ -221,7 +221,7 @@ func (p *peer) attachOutgoingConn(conn *outgoingConn) { var ok bool switch conn.t { case streamTypeMsgAppV2: - ok = p.msgAppWriter.attach(conn) + ok = p.msgAppV2Writer.attach(conn) case streamTypeMessage: ok = p.writer.attach(conn) default: @@ -264,7 +264,7 @@ func (p *peer) pick(m raftpb.Message) (writec chan<- raftpb.Message, picked stri // stream for a long time, only use one of the N pipelines to send MsgSnap. if isMsgSnap(m) { return p.pipeline.msgc, pipelineMsg - } else if writec, ok = p.msgAppWriter.writec(); ok && isMsgApp(m) { + } else if writec, ok = p.msgAppV2Writer.writec(); ok && isMsgApp(m) { return writec, streamAppV2 } else if writec, ok = p.writer.writec(); ok { return writec, streamMsg diff --git a/rafthttp/peer_test.go b/rafthttp/peer_test.go index 0ad2f82b3..1b5dd66df 100644 --- a/rafthttp/peer_test.go +++ b/rafthttp/peer_test.go @@ -75,9 +75,9 @@ func TestPeerPick(t *testing.T) { } for i, tt := range tests { peer := &peer{ - msgAppWriter: &streamWriter{working: tt.msgappWorking}, - writer: &streamWriter{working: tt.messageWorking}, - pipeline: &pipeline{}, + msgAppV2Writer: &streamWriter{working: tt.msgappWorking}, + writer: &streamWriter{working: tt.messageWorking}, + pipeline: &pipeline{}, } _, picked := peer.pick(tt.m) if picked != tt.wpicked {