rafthttp: deprecate streamTypeMsgApp

streamTypeMsgApp is only used in etcd 2.0. etcd 2.3 should not talk to
etcd 2.0, either send or receive requests. So I deprecate streamTypeMsgApp
and its related stuffs from rafthttp package.

updating term is only used from streamTypeMsgApp, so it is removed too.
This commit is contained in:
Yicheng Qin 2015-10-19 23:41:01 -07:00
parent 7dcb99b60e
commit f725f6a552
10 changed files with 46 additions and 377 deletions

View File

@ -66,7 +66,6 @@ func TestSendMessage(t *testing.T) {
tests := []raftpb.Message{ tests := []raftpb.Message{
// these messages are set to send to itself, which facilitates testing. // these messages are set to send to itself, which facilitates testing.
{Type: raftpb.MsgProp, From: 1, To: 2, Entries: []raftpb.Entry{{Data: data}}}, {Type: raftpb.MsgProp, From: 1, To: 2, Entries: []raftpb.Entry{{Data: data}}},
// TODO: send out MsgApp which fits msgapp stream but the term doesn't match
{Type: raftpb.MsgApp, From: 1, To: 2, Term: 1, Index: 3, LogTerm: 0, Entries: []raftpb.Entry{{Index: 4, Term: 1, Data: data}}, Commit: 3}, {Type: raftpb.MsgApp, From: 1, To: 2, Term: 1, Index: 3, LogTerm: 0, Entries: []raftpb.Entry{{Index: 4, Term: 1, Data: data}}, Commit: 3},
{Type: raftpb.MsgAppResp, From: 1, To: 2, Term: 1, Index: 3}, {Type: raftpb.MsgAppResp, From: 1, To: 2, Term: 1, Index: 3},
{Type: raftpb.MsgVote, From: 1, To: 2, Term: 1, Index: 3, LogTerm: 0}, {Type: raftpb.MsgVote, From: 1, To: 2, Term: 1, Index: 3, LogTerm: 0},

View File

@ -228,12 +228,9 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var t streamType var t streamType
switch path.Dir(r.URL.Path) { switch path.Dir(r.URL.Path) {
// backward compatibility case streamTypeMsgAppV2.endpoint():
case RaftStreamPrefix:
t = streamTypeMsgApp
case path.Join(RaftStreamPrefix, string(streamTypeMsgApp)):
t = streamTypeMsgAppV2 t = streamTypeMsgAppV2
case path.Join(RaftStreamPrefix, string(streamTypeMessage)): case streamTypeMessage.endpoint():
t = streamTypeMessage t = streamTypeMessage
default: default:
plog.Debugf("ignored unexpected streaming request path %s", r.URL.Path) plog.Debugf("ignored unexpected streaming request path %s", r.URL.Path)
@ -278,7 +275,6 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
c := newCloseNotifier() c := newCloseNotifier()
conn := &outgoingConn{ conn := &outgoingConn{
t: t, t: t,
termStr: r.Header.Get("X-Raft-Term"),
Writer: w, Writer: w,
Flusher: w.(http.Flusher), Flusher: w.(http.Flusher),
Closer: c, Closer: c,

View File

@ -170,11 +170,6 @@ func TestServeRaftStreamPrefix(t *testing.T) {
RaftStreamPrefix + "/msgapp/1", RaftStreamPrefix + "/msgapp/1",
streamTypeMsgAppV2, streamTypeMsgAppV2,
}, },
// backward compatibility
{
RaftStreamPrefix + "/1",
streamTypeMsgApp,
},
} }
for i, tt := range tests { for i, tt := range tests {
req, err := http.NewRequest("GET", "http://localhost:2380"+tt.path, nil) req, err := http.NewRequest("GET", "http://localhost:2380"+tt.path, nil)
@ -184,8 +179,6 @@ func TestServeRaftStreamPrefix(t *testing.T) {
req.Header.Set("X-Etcd-Cluster-ID", "1") req.Header.Set("X-Etcd-Cluster-ID", "1")
req.Header.Set("X-Server-Version", version.Version) req.Header.Set("X-Server-Version", version.Version)
req.Header.Set("X-Raft-To", "2") req.Header.Set("X-Raft-To", "2")
wterm := "1"
req.Header.Set("X-Raft-Term", wterm)
peer := newFakePeer() peer := newFakePeer()
peerGetter := &fakePeerGetter{peers: map[types.ID]Peer{types.ID(1): peer}} peerGetter := &fakePeerGetter{peers: map[types.ID]Peer{types.ID(1): peer}}
@ -206,9 +199,6 @@ func TestServeRaftStreamPrefix(t *testing.T) {
if conn.t != tt.wtype { if conn.t != tt.wtype {
t.Errorf("#%d: type = %s, want %s", i, conn.t, tt.wtype) t.Errorf("#%d: type = %s, want %s", i, conn.t, tt.wtype)
} }
if conn.termStr != wterm {
t.Errorf("#%d: term = %s, want %s", i, conn.termStr, wterm)
}
conn.Close() conn.Close()
} }
} }
@ -352,7 +342,6 @@ func (pg *fakePeerGetter) Get(id types.ID) Peer { return pg.peers[id] }
type fakePeer struct { type fakePeer struct {
msgs []raftpb.Message msgs []raftpb.Message
urls types.URLs urls types.URLs
term uint64
connc chan *outgoingConn connc chan *outgoingConn
} }
@ -364,7 +353,6 @@ func newFakePeer() *fakePeer {
func (pr *fakePeer) send(m raftpb.Message) { pr.msgs = append(pr.msgs, m) } func (pr *fakePeer) send(m raftpb.Message) { pr.msgs = append(pr.msgs, m) }
func (pr *fakePeer) update(urls types.URLs) { pr.urls = urls } func (pr *fakePeer) update(urls types.URLs) { pr.urls = urls }
func (pr *fakePeer) setTerm(term uint64) { pr.term = term }
func (pr *fakePeer) attachOutgoingConn(conn *outgoingConn) { pr.connc <- conn } func (pr *fakePeer) attachOutgoingConn(conn *outgoingConn) { pr.connc <- conn }
func (pr *fakePeer) activeSince() time.Time { return time.Time{} } func (pr *fakePeer) activeSince() time.Time { return time.Time{} }
func (pr *fakePeer) stop() {} func (pr *fakePeer) stop() {}

View File

@ -1,98 +0,0 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rafthttp
import (
"encoding/binary"
"io"
"time"
"github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb"
)
// msgAppEncoder is a optimized encoder for append messages. It assumes
// that the decoder has enough information to recover the fields except
// Entries, and it writes only Entries into the Writer.
// It MUST be used with a paired msgAppDecoder.
type msgAppEncoder struct {
w io.Writer
// TODO: move the fs stats and use new metrics
fs *stats.FollowerStats
}
func (enc *msgAppEncoder) encode(m raftpb.Message) error {
if isLinkHeartbeatMessage(m) {
return binary.Write(enc.w, binary.BigEndian, uint64(0))
}
start := time.Now()
ents := m.Entries
l := len(ents)
// There is no need to send empty ents, and it avoids confusion with
// heartbeat.
if l == 0 {
return nil
}
if err := binary.Write(enc.w, binary.BigEndian, uint64(l)); err != nil {
return err
}
for i := 0; i < l; i++ {
ent := &ents[i]
if err := writeEntryTo(enc.w, ent); err != nil {
return err
}
}
enc.fs.Succ(time.Since(start))
return nil
}
// msgAppDecoder is a optimized decoder for append messages. It reads data
// from the Reader and parses it into Entries, then builds messages.
type msgAppDecoder struct {
r io.Reader
local, remote types.ID
term uint64
}
func (dec *msgAppDecoder) decode() (raftpb.Message, error) {
var m raftpb.Message
var l uint64
if err := binary.Read(dec.r, binary.BigEndian, &l); err != nil {
return m, err
}
if l == 0 {
return linkHeartbeatMessage, nil
}
ents := make([]raftpb.Entry, int(l))
for i := 0; i < int(l); i++ {
ent := &ents[i]
if err := readEntryFrom(dec.r, ent); err != nil {
return m, err
}
}
m = raftpb.Message{
Type: raftpb.MsgApp,
From: uint64(dec.remote),
To: uint64(dec.local),
Term: dec.term,
LogTerm: dec.term,
Index: ents[0].Index - 1,
Entries: ents,
}
return m, nil
}

View File

@ -1,70 +0,0 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rafthttp
import (
"bytes"
"reflect"
"testing"
"github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb"
)
func TestMsgApp(t *testing.T) {
tests := []raftpb.Message{
{
Type: raftpb.MsgApp,
From: 1,
To: 2,
Term: 1,
LogTerm: 1,
Index: 3,
Entries: []raftpb.Entry{{Term: 1, Index: 4}},
},
{
Type: raftpb.MsgApp,
From: 1,
To: 2,
Term: 1,
LogTerm: 1,
Index: 0,
Entries: []raftpb.Entry{
{Term: 1, Index: 1, Data: []byte("some data")},
{Term: 1, Index: 2, Data: []byte("some data")},
{Term: 1, Index: 3, Data: []byte("some data")},
},
},
linkHeartbeatMessage,
}
for i, tt := range tests {
b := &bytes.Buffer{}
enc := &msgAppEncoder{w: b, fs: &stats.FollowerStats{}}
if err := enc.encode(tt); err != nil {
t.Errorf("#%d: unexpected encode message error: %v", i, err)
continue
}
dec := &msgAppDecoder{r: b, local: types.ID(tt.To), remote: types.ID(tt.From), term: tt.Term}
m, err := dec.decode()
if err != nil {
t.Errorf("#%d: unexpected decode message error: %v", i, err)
continue
}
if !reflect.DeepEqual(m, tt) {
t.Errorf("#%d: message = %+v, want %+v", i, m, tt)
}
}
}

View File

@ -60,8 +60,6 @@ type Peer interface {
send(m raftpb.Message) send(m raftpb.Message)
// update updates the urls of remote peer. // update updates the urls of remote peer.
update(urls types.URLs) update(urls types.URLs)
// setTerm sets the term of ongoing communication.
setTerm(term uint64)
// attachOutgoingConn attachs the outgoing connection to the peer for // attachOutgoingConn attachs the outgoing connection to the peer for
// stream usage. After the call, the ownership of the outgoing // stream usage. After the call, the ownership of the outgoing
// connection hands over to the peer. The peer will close the connection // connection hands over to the peer. The peer will close the connection
@ -104,7 +102,6 @@ type peer struct {
recvc chan raftpb.Message recvc chan raftpb.Message
propc chan raftpb.Message propc chan raftpb.Message
newURLsC chan types.URLs newURLsC chan types.URLs
termc chan uint64
// for testing // for testing
pausec chan struct{} pausec chan struct{}
@ -114,7 +111,7 @@ type peer struct {
done chan struct{} done chan struct{}
} }
func startPeer(streamRt, pipelineRt http.RoundTripper, urls types.URLs, local, to, cid types.ID, snapst *snapshotStore, r Raft, fs *stats.FollowerStats, errorc chan error, term uint64, v3demo bool) *peer { func startPeer(streamRt, pipelineRt http.RoundTripper, urls types.URLs, local, to, cid types.ID, snapst *snapshotStore, r Raft, fs *stats.FollowerStats, errorc chan error, v3demo bool) *peer {
picker := newURLPicker(urls) picker := newURLPicker(urls)
status := newPeerStatus(to) status := newPeerStatus(to)
p := &peer{ p := &peer{
@ -130,7 +127,6 @@ func startPeer(streamRt, pipelineRt http.RoundTripper, urls types.URLs, local, t
recvc: make(chan raftpb.Message, recvBufSize), recvc: make(chan raftpb.Message, recvBufSize),
propc: make(chan raftpb.Message, maxPendingProposals), propc: make(chan raftpb.Message, maxPendingProposals),
newURLsC: make(chan types.URLs), newURLsC: make(chan types.URLs),
termc: make(chan uint64),
pausec: make(chan struct{}), pausec: make(chan struct{}),
resumec: make(chan struct{}), resumec: make(chan struct{}),
stopc: make(chan struct{}), stopc: make(chan struct{}),
@ -153,8 +149,8 @@ func startPeer(streamRt, pipelineRt http.RoundTripper, urls types.URLs, local, t
} }
}() }()
p.msgAppReader = startStreamReader(streamRt, picker, streamTypeMsgAppV2, local, to, cid, status, p.recvc, p.propc, errorc, term) p.msgAppReader = startStreamReader(streamRt, picker, streamTypeMsgAppV2, local, to, cid, status, p.recvc, p.propc, errorc)
reader := startStreamReader(streamRt, picker, streamTypeMessage, local, to, cid, status, p.recvc, p.propc, errorc, term) reader := startStreamReader(streamRt, picker, streamTypeMessage, local, to, cid, status, p.recvc, p.propc, errorc)
go func() { go func() {
var paused bool var paused bool
for { for {
@ -222,12 +218,10 @@ func (p *peer) update(urls types.URLs) {
} }
} }
func (p *peer) setTerm(term uint64) { p.msgAppReader.updateMsgAppTerm(term) }
func (p *peer) attachOutgoingConn(conn *outgoingConn) { func (p *peer) attachOutgoingConn(conn *outgoingConn) {
var ok bool var ok bool
switch conn.t { switch conn.t {
case streamTypeMsgApp, streamTypeMsgAppV2: case streamTypeMsgAppV2:
ok = p.msgAppWriter.attach(conn) ok = p.msgAppWriter.attach(conn)
case streamTypeMessage: case streamTypeMessage:
ok = p.writer.attach(conn) ok = p.writer.attach(conn)

View File

@ -21,7 +21,6 @@ import (
"net" "net"
"net/http" "net/http"
"path" "path"
"strconv"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -37,7 +36,6 @@ import (
const ( const (
streamTypeMessage streamType = "message" streamTypeMessage streamType = "message"
streamTypeMsgAppV2 streamType = "msgappv2" streamTypeMsgAppV2 streamType = "msgappv2"
streamTypeMsgApp streamType = "msgapp"
streamBufSize = 4096 streamBufSize = 4096
) )
@ -47,9 +45,9 @@ var (
// the key is in string format "major.minor.patch" // the key is in string format "major.minor.patch"
supportedStream = map[string][]streamType{ supportedStream = map[string][]streamType{
"2.0.0": {streamTypeMsgApp}, "2.0.0": {},
"2.1.0": {streamTypeMsgApp, streamTypeMsgAppV2, streamTypeMessage}, "2.1.0": {streamTypeMsgAppV2, streamTypeMessage},
"2.2.0": {streamTypeMsgApp, streamTypeMsgAppV2, streamTypeMessage}, "2.2.0": {streamTypeMsgAppV2, streamTypeMessage},
} }
) )
@ -57,8 +55,6 @@ type streamType string
func (t streamType) endpoint() string { func (t streamType) endpoint() string {
switch t { switch t {
case streamTypeMsgApp: // for backward compatibility of v2.0
return RaftStreamPrefix
case streamTypeMsgAppV2: case streamTypeMsgAppV2:
return path.Join(RaftStreamPrefix, "msgapp") return path.Join(RaftStreamPrefix, "msgapp")
case streamTypeMessage: case streamTypeMessage:
@ -71,8 +67,6 @@ func (t streamType) endpoint() string {
func (t streamType) String() string { func (t streamType) String() string {
switch t { switch t {
case streamTypeMsgApp:
return "stream MsgApp"
case streamTypeMsgAppV2: case streamTypeMsgAppV2:
return "stream MsgApp v2" return "stream MsgApp v2"
case streamTypeMessage: case streamTypeMessage:
@ -95,7 +89,6 @@ func isLinkHeartbeatMessage(m raftpb.Message) bool {
type outgoingConn struct { type outgoingConn struct {
t streamType t streamType
termStr string
io.Writer io.Writer
http.Flusher http.Flusher
io.Closer io.Closer
@ -138,7 +131,6 @@ func (cw *streamWriter) run() {
var msgc chan raftpb.Message var msgc chan raftpb.Message
var heartbeatc <-chan time.Time var heartbeatc <-chan time.Time
var t streamType var t streamType
var msgAppTerm uint64
var enc encoder var enc encoder
var flusher http.Flusher var flusher http.Flusher
tickc := time.Tick(ConnReadTimeout / 3) tickc := time.Tick(ConnReadTimeout / 3)
@ -158,16 +150,6 @@ func (cw *streamWriter) run() {
flusher.Flush() flusher.Flush()
reportSentDuration(string(t), linkHeartbeatMessage, time.Since(start)) reportSentDuration(string(t), linkHeartbeatMessage, time.Since(start))
case m := <-msgc: case m := <-msgc:
if t == streamTypeMsgApp && m.Term != msgAppTerm {
// TODO: reasonable retry logic
if m.Term > msgAppTerm {
cw.close()
heartbeatc, msgc = nil, nil
// TODO: report to raft at peer level
cw.r.ReportUnreachable(m.To)
}
continue
}
start := time.Now() start := time.Now()
if err := enc.encode(m); err != nil { if err := enc.encode(m); err != nil {
reportSentFailure(string(t), m) reportSentFailure(string(t), m)
@ -184,13 +166,6 @@ func (cw *streamWriter) run() {
cw.close() cw.close()
t = conn.t t = conn.t
switch conn.t { switch conn.t {
case streamTypeMsgApp:
var err error
msgAppTerm, err = strconv.ParseUint(conn.termStr, 10, 64)
if err != nil {
plog.Panicf("could not parse term %s to uint (%v)", conn.termStr, err)
}
enc = &msgAppEncoder{w: conn.Writer, fs: cw.fs}
case streamTypeMsgAppV2: case streamTypeMsgAppV2:
enc = newMsgAppV2Encoder(conn.Writer, cw.fs) enc = newMsgAppV2Encoder(conn.Writer, cw.fs)
case streamTypeMessage: case streamTypeMessage:
@ -261,14 +236,13 @@ type streamReader struct {
errorc chan<- error errorc chan<- error
mu sync.Mutex mu sync.Mutex
msgAppTerm uint64
cancel func() cancel func()
closer io.Closer closer io.Closer
stopc chan struct{} stopc chan struct{}
done chan struct{} done chan struct{}
} }
func startStreamReader(tr http.RoundTripper, picker *urlPicker, t streamType, local, remote, cid types.ID, status *peerStatus, recvc chan<- raftpb.Message, propc chan<- raftpb.Message, errorc chan<- error, term uint64) *streamReader { func startStreamReader(tr http.RoundTripper, picker *urlPicker, t streamType, local, remote, cid types.ID, status *peerStatus, recvc chan<- raftpb.Message, propc chan<- raftpb.Message, errorc chan<- error) *streamReader {
r := &streamReader{ r := &streamReader{
tr: tr, tr: tr,
picker: picker, picker: picker,
@ -280,7 +254,6 @@ func startStreamReader(tr http.RoundTripper, picker *urlPicker, t streamType, lo
recvc: recvc, recvc: recvc,
propc: propc, propc: propc,
errorc: errorc, errorc: errorc,
msgAppTerm: term,
stopc: make(chan struct{}), stopc: make(chan struct{}),
done: make(chan struct{}), done: make(chan struct{}),
} }
@ -292,12 +265,6 @@ func (cr *streamReader) run() {
for { for {
t := cr.t t := cr.t
rc, err := cr.dial(t) rc, err := cr.dial(t)
// downgrade to streamTypeMsgApp if the remote doesn't support
// streamTypeMsgAppV2
if t == streamTypeMsgAppV2 && err == errUnsupportedStreamType {
t = streamTypeMsgApp
rc, err = cr.dial(t)
}
if err != nil { if err != nil {
if err != errUnsupportedStreamType { if err != errUnsupportedStreamType {
cr.status.deactivate(failureType{source: t.String(), action: "dial"}, err.Error()) cr.status.deactivate(failureType{source: t.String(), action: "dial"}, err.Error())
@ -310,9 +277,6 @@ func (cr *streamReader) run() {
case err == io.EOF: case err == io.EOF:
// connection is closed by the remote // connection is closed by the remote
case isClosedConnectionError(err): case isClosedConnectionError(err):
// stream msgapp is only used for etcd 2.0, and etcd 2.0 doesn't
// heartbeat on the idle stream, so it is expected to time out.
case t == streamTypeMsgApp && isNetworkTimeoutError(err):
default: default:
cr.status.deactivate(failureType{source: t.String(), action: "read"}, err.Error()) cr.status.deactivate(failureType{source: t.String(), action: "read"}, err.Error())
} }
@ -332,8 +296,6 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
var dec decoder var dec decoder
cr.mu.Lock() cr.mu.Lock()
switch t { switch t {
case streamTypeMsgApp:
dec = &msgAppDecoder{r: rc, local: cr.local, remote: cr.remote, term: cr.msgAppTerm}
case streamTypeMsgAppV2: case streamTypeMsgAppV2:
dec = newMsgAppV2Decoder(rc, cr.local, cr.remote) dec = newMsgAppV2Decoder(rc, cr.local, cr.remote)
case streamTypeMessage: case streamTypeMessage:
@ -377,20 +339,6 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
} }
} }
// updateMsgAppTerm updates the term for MsgApp stream, and closes
// the existing MsgApp stream if term is updated.
func (cr *streamReader) updateMsgAppTerm(term uint64) {
cr.mu.Lock()
defer cr.mu.Unlock()
if cr.msgAppTerm >= term {
return
}
cr.msgAppTerm = term
if cr.t == streamTypeMsgApp {
cr.close()
}
}
func (cr *streamReader) stop() { func (cr *streamReader) stop() {
close(cr.stopc) close(cr.stopc)
cr.mu.Lock() cr.mu.Lock()
@ -410,10 +358,6 @@ func (cr *streamReader) isWorking() bool {
func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) { func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
u := cr.picker.pick() u := cr.picker.pick()
cr.mu.Lock()
term := cr.msgAppTerm
cr.mu.Unlock()
uu := u uu := u
uu.Path = path.Join(t.endpoint(), cr.local.String()) uu.Path = path.Join(t.endpoint(), cr.local.String())
@ -427,9 +371,6 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
req.Header.Set("X-Min-Cluster-Version", version.MinClusterVersion) req.Header.Set("X-Min-Cluster-Version", version.MinClusterVersion)
req.Header.Set("X-Etcd-Cluster-ID", cr.cid.String()) req.Header.Set("X-Etcd-Cluster-ID", cr.cid.String())
req.Header.Set("X-Raft-To", cr.remote.String()) req.Header.Set("X-Raft-To", cr.remote.String())
if t == streamTypeMsgApp {
req.Header.Set("X-Raft-Term", strconv.FormatUint(term, 10))
}
cr.mu.Lock() cr.mu.Lock()
select { select {

View File

@ -98,7 +98,7 @@ func TestStreamWriterAttachBadOutgoingConn(t *testing.T) {
} }
func TestStreamReaderDialRequest(t *testing.T) { func TestStreamReaderDialRequest(t *testing.T) {
for i, tt := range []streamType{streamTypeMsgApp, streamTypeMessage, streamTypeMsgAppV2} { for i, tt := range []streamType{streamTypeMessage, streamTypeMsgAppV2} {
tr := &roundTripperRecorder{} tr := &roundTripperRecorder{}
sr := &streamReader{ sr := &streamReader{
tr: tr, tr: tr,
@ -106,7 +106,6 @@ func TestStreamReaderDialRequest(t *testing.T) {
local: types.ID(1), local: types.ID(1),
remote: types.ID(2), remote: types.ID(2),
cid: types.ID(1), cid: types.ID(1),
msgAppTerm: 1,
} }
sr.dial(tt) sr.dial(tt)
@ -124,9 +123,6 @@ func TestStreamReaderDialRequest(t *testing.T) {
if g := req.Header.Get("X-Raft-To"); g != "2" { if g := req.Header.Get("X-Raft-To"); g != "2" {
t.Errorf("#%d: header X-Raft-To = %s, want 2", i, g) t.Errorf("#%d: header X-Raft-To = %s, want 2", i, g)
} }
if g := req.Header.Get("X-Raft-Term"); tt == streamTypeMsgApp && g != "1" {
t.Errorf("#%d: header X-Raft-Term = %s, want 1", i, g)
}
} }
} }
@ -173,41 +169,6 @@ func TestStreamReaderDialResult(t *testing.T) {
} }
} }
func TestStreamReaderUpdateMsgAppTerm(t *testing.T) {
term := uint64(2)
tests := []struct {
term uint64
typ streamType
wterm uint64
wclose bool
}{
// lower term
{1, streamTypeMsgApp, 2, false},
// unchanged term
{2, streamTypeMsgApp, 2, false},
// higher term
{3, streamTypeMessage, 3, false},
{3, streamTypeMsgAppV2, 3, false},
// higher term, reset closer
{3, streamTypeMsgApp, 3, true},
}
for i, tt := range tests {
closer := &fakeWriteFlushCloser{}
cr := &streamReader{
msgAppTerm: term,
t: tt.typ,
closer: closer,
}
cr.updateMsgAppTerm(tt.term)
if cr.msgAppTerm != tt.wterm {
t.Errorf("#%d: term = %d, want %d", i, cr.msgAppTerm, tt.wterm)
}
if closer.closed != tt.wclose {
t.Errorf("#%d: closed = %v, want %v", i, closer.closed, tt.wclose)
}
}
}
// TestStreamReaderDialDetectUnsupport tests that dial func could find // TestStreamReaderDialDetectUnsupport tests that dial func could find
// out that the stream type is not supported by the remote. // out that the stream type is not supported by the remote.
func TestStreamReaderDialDetectUnsupport(t *testing.T) { func TestStreamReaderDialDetectUnsupport(t *testing.T) {
@ -249,31 +210,21 @@ func TestStream(t *testing.T) {
tests := []struct { tests := []struct {
t streamType t streamType
term uint64
m raftpb.Message m raftpb.Message
wc chan raftpb.Message wc chan raftpb.Message
}{ }{
{ {
streamTypeMessage, streamTypeMessage,
0,
raftpb.Message{Type: raftpb.MsgProp, To: 2}, raftpb.Message{Type: raftpb.MsgProp, To: 2},
propc, propc,
}, },
{ {
streamTypeMessage, streamTypeMessage,
0,
msgapp,
recvc,
},
{
streamTypeMsgApp,
1,
msgapp, msgapp,
recvc, recvc,
}, },
{ {
streamTypeMsgAppV2, streamTypeMsgAppV2,
0,
msgapp, msgapp,
recvc, recvc,
}, },
@ -288,7 +239,7 @@ func TestStream(t *testing.T) {
h.sw = sw h.sw = sw
picker := mustNewURLPicker(t, []string{srv.URL}) picker := mustNewURLPicker(t, []string{srv.URL})
sr := startStreamReader(&http.Transport{}, picker, tt.t, types.ID(1), types.ID(2), types.ID(1), newPeerStatus(types.ID(1)), recvc, propc, nil, tt.term) sr := startStreamReader(&http.Transport{}, picker, tt.t, types.ID(1), types.ID(2), types.ID(1), newPeerStatus(types.ID(1)), recvc, propc, nil)
defer sr.stop() defer sr.stop()
// wait for stream to work // wait for stream to work
var writec chan<- raftpb.Message var writec chan<- raftpb.Message
@ -321,27 +272,21 @@ func TestCheckStreamSupport(t *testing.T) {
}{ }{
// support // support
{ {
semver.Must(semver.NewVersion("2.0.0")), semver.Must(semver.NewVersion("2.1.0")),
streamTypeMsgApp, streamTypeMsgAppV2,
true, true,
}, },
// ignore patch // ignore patch
{ {
semver.Must(semver.NewVersion("2.0.9")), semver.Must(semver.NewVersion("2.1.9")),
streamTypeMsgApp, streamTypeMsgAppV2,
true, true,
}, },
// ignore prerelease // ignore prerelease
{ {
semver.Must(semver.NewVersion("2.0.0-alpha")), semver.Must(semver.NewVersion("2.1.0-alpha")),
streamTypeMsgApp,
true,
},
// not support
{
semver.Must(semver.NewVersion("2.0.0")),
streamTypeMsgAppV2, streamTypeMsgAppV2,
false, true,
}, },
} }
for i, tt := range tests { for i, tt := range tests {
@ -378,7 +323,6 @@ func (h *fakeStreamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
c := newCloseNotifier() c := newCloseNotifier()
h.sw.attach(&outgoingConn{ h.sw.attach(&outgoingConn{
t: h.t, t: h.t,
termStr: r.Header.Get("X-Raft-Term"),
Writer: w, Writer: w,
Flusher: w.(http.Flusher), Flusher: w.(http.Flusher),
Closer: c, Closer: c,

View File

@ -121,8 +121,7 @@ type Transport struct {
streamRt http.RoundTripper // roundTripper used by streams streamRt http.RoundTripper // roundTripper used by streams
pipelineRt http.RoundTripper // roundTripper used by pipelines pipelineRt http.RoundTripper // roundTripper used by pipelines
mu sync.RWMutex // protect the term, remote and peer map mu sync.RWMutex // protect the remote and peer map
term uint64 // the latest term that has been observed
remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up
peers map[types.ID]Peer // peers map peers map[types.ID]Peer // peers map
@ -169,18 +168,6 @@ func (t *Transport) Get(id types.ID) Peer {
return t.peers[id] return t.peers[id]
} }
func (t *Transport) maybeUpdatePeersTerm(term uint64) {
t.mu.Lock()
defer t.mu.Unlock()
if t.term >= term {
return
}
t.term = term
for _, p := range t.peers {
p.setTerm(term)
}
}
func (t *Transport) Send(msgs []raftpb.Message) { func (t *Transport) Send(msgs []raftpb.Message) {
for _, m := range msgs { for _, m := range msgs {
if m.To == 0 { if m.To == 0 {
@ -189,12 +176,6 @@ func (t *Transport) Send(msgs []raftpb.Message) {
} }
to := types.ID(m.To) to := types.ID(m.To)
// update terms for all the peers
// ignore MsgProp since it does not have a valid term
if m.Type != raftpb.MsgProp {
t.maybeUpdatePeersTerm(m.Term)
}
p, ok := t.peers[to] p, ok := t.peers[to]
if ok { if ok {
if m.Type == raftpb.MsgApp { if m.Type == raftpb.MsgApp {
@ -254,7 +235,7 @@ func (t *Transport) AddPeer(id types.ID, us []string) {
plog.Panicf("newURLs %+v should never fail: %+v", us, err) plog.Panicf("newURLs %+v should never fail: %+v", us, err)
} }
fs := t.LeaderStats.Follower(id.String()) fs := t.LeaderStats.Follower(id.String())
t.peers[id] = startPeer(t.streamRt, t.pipelineRt, urls, t.ID, id, t.ClusterID, t.snapst, t.Raft, fs, t.ErrorC, t.term, t.V3demo) t.peers[id] = startPeer(t.streamRt, t.pipelineRt, urls, t.ID, id, t.ClusterID, t.snapst, t.Raft, fs, t.ErrorC, t.V3demo)
addPeerToProber(t.prober, id.String(), us) addPeerToProber(t.prober, id.String(), us)
} }

View File

@ -68,11 +68,9 @@ func TestTransportSend(t *testing.T) {
func TestTransportAdd(t *testing.T) { func TestTransportAdd(t *testing.T) {
ls := stats.NewLeaderStats("") ls := stats.NewLeaderStats("")
term := uint64(10)
tr := &Transport{ tr := &Transport{
LeaderStats: ls, LeaderStats: ls,
streamRt: &roundTripperRecorder{}, streamRt: &roundTripperRecorder{},
term: term,
peers: make(map[types.ID]Peer), peers: make(map[types.ID]Peer),
prober: probing.NewProber(nil), prober: probing.NewProber(nil),
} }
@ -95,10 +93,6 @@ func TestTransportAdd(t *testing.T) {
} }
tr.Stop() tr.Stop()
if g := s.(*peer).msgAppReader.msgAppTerm; g != term {
t.Errorf("peer.term = %d, want %d", g, term)
}
} }
func TestTransportRemove(t *testing.T) { func TestTransportRemove(t *testing.T) {