Merge pull request #3720 from yichengq/clean-streamAppV1

rafthttp: deprecate streamTypeMsgApp and remove msgApp stream sent restriction due to streamTypeMsgApp
This commit is contained in:
Yicheng Qin 2015-10-20 10:37:51 -07:00
commit 9635d8d94c
11 changed files with 79 additions and 413 deletions

View File

@ -66,7 +66,6 @@ func TestSendMessage(t *testing.T) {
tests := []raftpb.Message{
// these messages are set to send to itself, which facilitates testing.
{Type: raftpb.MsgProp, From: 1, To: 2, Entries: []raftpb.Entry{{Data: data}}},
// TODO: send out MsgApp which fits msgapp stream but the term doesn't match
{Type: raftpb.MsgApp, From: 1, To: 2, Term: 1, Index: 3, LogTerm: 0, Entries: []raftpb.Entry{{Index: 4, Term: 1, Data: data}}, Commit: 3},
{Type: raftpb.MsgAppResp, From: 1, To: 2, Term: 1, Index: 3},
{Type: raftpb.MsgVote, From: 1, To: 2, Term: 1, Index: 3, LogTerm: 0},
@ -149,7 +148,7 @@ func newServerStats() *stats.ServerStats {
func waitStreamWorking(p *peer) bool {
for i := 0; i < 1000; i++ {
time.Sleep(time.Millisecond)
if _, ok := p.msgAppWriter.writec(); !ok {
if _, ok := p.msgAppV2Writer.writec(); !ok {
continue
}
if _, ok := p.writer.writec(); !ok {

View File

@ -228,12 +228,9 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var t streamType
switch path.Dir(r.URL.Path) {
// backward compatibility
case RaftStreamPrefix:
t = streamTypeMsgApp
case path.Join(RaftStreamPrefix, string(streamTypeMsgApp)):
case streamTypeMsgAppV2.endpoint():
t = streamTypeMsgAppV2
case path.Join(RaftStreamPrefix, string(streamTypeMessage)):
case streamTypeMessage.endpoint():
t = streamTypeMessage
default:
plog.Debugf("ignored unexpected streaming request path %s", r.URL.Path)
@ -278,7 +275,6 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
c := newCloseNotifier()
conn := &outgoingConn{
t: t,
termStr: r.Header.Get("X-Raft-Term"),
Writer: w,
Flusher: w.(http.Flusher),
Closer: c,

View File

@ -170,11 +170,6 @@ func TestServeRaftStreamPrefix(t *testing.T) {
RaftStreamPrefix + "/msgapp/1",
streamTypeMsgAppV2,
},
// backward compatibility
{
RaftStreamPrefix + "/1",
streamTypeMsgApp,
},
}
for i, tt := range tests {
req, err := http.NewRequest("GET", "http://localhost:2380"+tt.path, nil)
@ -184,8 +179,6 @@ func TestServeRaftStreamPrefix(t *testing.T) {
req.Header.Set("X-Etcd-Cluster-ID", "1")
req.Header.Set("X-Server-Version", version.Version)
req.Header.Set("X-Raft-To", "2")
wterm := "1"
req.Header.Set("X-Raft-Term", wterm)
peer := newFakePeer()
peerGetter := &fakePeerGetter{peers: map[types.ID]Peer{types.ID(1): peer}}
@ -206,9 +199,6 @@ func TestServeRaftStreamPrefix(t *testing.T) {
if conn.t != tt.wtype {
t.Errorf("#%d: type = %s, want %s", i, conn.t, tt.wtype)
}
if conn.termStr != wterm {
t.Errorf("#%d: term = %s, want %s", i, conn.termStr, wterm)
}
conn.Close()
}
}
@ -352,7 +342,6 @@ func (pg *fakePeerGetter) Get(id types.ID) Peer { return pg.peers[id] }
type fakePeer struct {
msgs []raftpb.Message
urls types.URLs
term uint64
connc chan *outgoingConn
}
@ -364,7 +353,6 @@ func newFakePeer() *fakePeer {
func (pr *fakePeer) send(m raftpb.Message) { pr.msgs = append(pr.msgs, m) }
func (pr *fakePeer) update(urls types.URLs) { pr.urls = urls }
func (pr *fakePeer) setTerm(term uint64) { pr.term = term }
func (pr *fakePeer) attachOutgoingConn(conn *outgoingConn) { pr.connc <- conn }
func (pr *fakePeer) activeSince() time.Time { return time.Time{} }
func (pr *fakePeer) stop() {}

View File

@ -1,98 +0,0 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rafthttp
import (
"encoding/binary"
"io"
"time"
"github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb"
)
// msgAppEncoder is a optimized encoder for append messages. It assumes
// that the decoder has enough information to recover the fields except
// Entries, and it writes only Entries into the Writer.
// It MUST be used with a paired msgAppDecoder.
type msgAppEncoder struct {
w io.Writer
// TODO: move the fs stats and use new metrics
fs *stats.FollowerStats
}
func (enc *msgAppEncoder) encode(m raftpb.Message) error {
if isLinkHeartbeatMessage(m) {
return binary.Write(enc.w, binary.BigEndian, uint64(0))
}
start := time.Now()
ents := m.Entries
l := len(ents)
// There is no need to send empty ents, and it avoids confusion with
// heartbeat.
if l == 0 {
return nil
}
if err := binary.Write(enc.w, binary.BigEndian, uint64(l)); err != nil {
return err
}
for i := 0; i < l; i++ {
ent := &ents[i]
if err := writeEntryTo(enc.w, ent); err != nil {
return err
}
}
enc.fs.Succ(time.Since(start))
return nil
}
// msgAppDecoder is a optimized decoder for append messages. It reads data
// from the Reader and parses it into Entries, then builds messages.
type msgAppDecoder struct {
r io.Reader
local, remote types.ID
term uint64
}
func (dec *msgAppDecoder) decode() (raftpb.Message, error) {
var m raftpb.Message
var l uint64
if err := binary.Read(dec.r, binary.BigEndian, &l); err != nil {
return m, err
}
if l == 0 {
return linkHeartbeatMessage, nil
}
ents := make([]raftpb.Entry, int(l))
for i := 0; i < int(l); i++ {
ent := &ents[i]
if err := readEntryFrom(dec.r, ent); err != nil {
return m, err
}
}
m = raftpb.Message{
Type: raftpb.MsgApp,
From: uint64(dec.remote),
To: uint64(dec.local),
Term: dec.term,
LogTerm: dec.term,
Index: ents[0].Index - 1,
Entries: ents,
}
return m, nil
}

View File

@ -1,70 +0,0 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rafthttp
import (
"bytes"
"reflect"
"testing"
"github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb"
)
func TestMsgApp(t *testing.T) {
tests := []raftpb.Message{
{
Type: raftpb.MsgApp,
From: 1,
To: 2,
Term: 1,
LogTerm: 1,
Index: 3,
Entries: []raftpb.Entry{{Term: 1, Index: 4}},
},
{
Type: raftpb.MsgApp,
From: 1,
To: 2,
Term: 1,
LogTerm: 1,
Index: 0,
Entries: []raftpb.Entry{
{Term: 1, Index: 1, Data: []byte("some data")},
{Term: 1, Index: 2, Data: []byte("some data")},
{Term: 1, Index: 3, Data: []byte("some data")},
},
},
linkHeartbeatMessage,
}
for i, tt := range tests {
b := &bytes.Buffer{}
enc := &msgAppEncoder{w: b, fs: &stats.FollowerStats{}}
if err := enc.encode(tt); err != nil {
t.Errorf("#%d: unexpected encode message error: %v", i, err)
continue
}
dec := &msgAppDecoder{r: b, local: types.ID(tt.To), remote: types.ID(tt.From), term: tt.Term}
m, err := dec.decode()
if err != nil {
t.Errorf("#%d: unexpected decode message error: %v", i, err)
continue
}
if !reflect.DeepEqual(m, tt) {
t.Errorf("#%d: message = %+v, want %+v", i, m, tt)
}
}
}

View File

@ -45,7 +45,6 @@ const (
// to hold all proposals.
maxPendingProposals = 4096
streamApp = "streamMsgApp"
streamAppV2 = "streamMsgAppV2"
streamMsg = "streamMsg"
pipelineMsg = "pipeline"
@ -60,8 +59,6 @@ type Peer interface {
send(m raftpb.Message)
// update updates the urls of remote peer.
update(urls types.URLs)
// setTerm sets the term of ongoing communication.
setTerm(term uint64)
// attachOutgoingConn attachs the outgoing connection to the peer for
// stream usage. After the call, the ownership of the outgoing
// connection hands over to the peer. The peer will close the connection
@ -94,17 +91,16 @@ type peer struct {
status *peerStatus
msgAppWriter *streamWriter
writer *streamWriter
pipeline *pipeline
snapSender *snapshotSender // snapshot sender to send v3 snapshot messages
msgAppReader *streamReader
msgAppV2Writer *streamWriter
writer *streamWriter
pipeline *pipeline
snapSender *snapshotSender // snapshot sender to send v3 snapshot messages
msgAppV2Reader *streamReader
sendc chan raftpb.Message
recvc chan raftpb.Message
propc chan raftpb.Message
newURLsC chan types.URLs
termc chan uint64
// for testing
pausec chan struct{}
@ -114,27 +110,26 @@ type peer struct {
done chan struct{}
}
func startPeer(streamRt, pipelineRt http.RoundTripper, urls types.URLs, local, to, cid types.ID, snapst *snapshotStore, r Raft, fs *stats.FollowerStats, errorc chan error, term uint64, v3demo bool) *peer {
func startPeer(streamRt, pipelineRt http.RoundTripper, urls types.URLs, local, to, cid types.ID, snapst *snapshotStore, r Raft, fs *stats.FollowerStats, errorc chan error, v3demo bool) *peer {
picker := newURLPicker(urls)
status := newPeerStatus(to)
p := &peer{
id: to,
r: r,
v3demo: v3demo,
status: status,
msgAppWriter: startStreamWriter(to, status, fs, r),
writer: startStreamWriter(to, status, fs, r),
pipeline: newPipeline(pipelineRt, picker, local, to, cid, status, fs, r, errorc),
snapSender: newSnapshotSender(pipelineRt, picker, local, to, cid, status, snapst, r, errorc),
sendc: make(chan raftpb.Message),
recvc: make(chan raftpb.Message, recvBufSize),
propc: make(chan raftpb.Message, maxPendingProposals),
newURLsC: make(chan types.URLs),
termc: make(chan uint64),
pausec: make(chan struct{}),
resumec: make(chan struct{}),
stopc: make(chan struct{}),
done: make(chan struct{}),
id: to,
r: r,
v3demo: v3demo,
status: status,
msgAppV2Writer: startStreamWriter(to, status, fs, r),
writer: startStreamWriter(to, status, fs, r),
pipeline: newPipeline(pipelineRt, picker, local, to, cid, status, fs, r, errorc),
snapSender: newSnapshotSender(pipelineRt, picker, local, to, cid, status, snapst, r, errorc),
sendc: make(chan raftpb.Message),
recvc: make(chan raftpb.Message, recvBufSize),
propc: make(chan raftpb.Message, maxPendingProposals),
newURLsC: make(chan types.URLs),
pausec: make(chan struct{}),
resumec: make(chan struct{}),
stopc: make(chan struct{}),
done: make(chan struct{}),
}
// Use go-routine for process of MsgProp because it is
@ -153,8 +148,8 @@ func startPeer(streamRt, pipelineRt http.RoundTripper, urls types.URLs, local, t
}
}()
p.msgAppReader = startStreamReader(streamRt, picker, streamTypeMsgAppV2, local, to, cid, status, p.recvc, p.propc, errorc, term)
reader := startStreamReader(streamRt, picker, streamTypeMessage, local, to, cid, status, p.recvc, p.propc, errorc, term)
p.msgAppV2Reader = startStreamReader(streamRt, picker, streamTypeMsgAppV2, local, to, cid, status, p.recvc, p.propc, errorc)
reader := startStreamReader(streamRt, picker, streamTypeMessage, local, to, cid, status, p.recvc, p.propc, errorc)
go func() {
var paused bool
for {
@ -193,11 +188,11 @@ func startPeer(streamRt, pipelineRt http.RoundTripper, urls types.URLs, local, t
paused = false
case <-p.stopc:
cancel()
p.msgAppWriter.stop()
p.msgAppV2Writer.stop()
p.writer.stop()
p.pipeline.stop()
p.snapSender.stop()
p.msgAppReader.stop()
p.msgAppV2Reader.stop()
reader.stop()
close(p.done)
return
@ -222,13 +217,11 @@ func (p *peer) update(urls types.URLs) {
}
}
func (p *peer) setTerm(term uint64) { p.msgAppReader.updateMsgAppTerm(term) }
func (p *peer) attachOutgoingConn(conn *outgoingConn) {
var ok bool
switch conn.t {
case streamTypeMsgApp, streamTypeMsgAppV2:
ok = p.msgAppWriter.attach(conn)
case streamTypeMsgAppV2:
ok = p.msgAppV2Writer.attach(conn)
case streamTypeMessage:
ok = p.writer.attach(conn)
default:
@ -271,12 +264,14 @@ func (p *peer) pick(m raftpb.Message) (writec chan<- raftpb.Message, picked stri
// stream for a long time, only use one of the N pipelines to send MsgSnap.
if isMsgSnap(m) {
return p.pipeline.msgc, pipelineMsg
} else if writec, ok = p.msgAppWriter.writec(); ok && canUseMsgAppStream(m) {
return writec, streamApp
} else if writec, ok = p.msgAppV2Writer.writec(); ok && isMsgApp(m) {
return writec, streamAppV2
} else if writec, ok = p.writer.writec(); ok {
return writec, streamMsg
}
return p.pipeline.msgc, pipelineMsg
}
func isMsgApp(m raftpb.Message) bool { return m.Type == raftpb.MsgApp }
func isMsgSnap(m raftpb.Message) bool { return m.Type == raftpb.MsgSnap }

View File

@ -35,7 +35,7 @@ func TestPeerPick(t *testing.T) {
{
true, true,
raftpb.Message{Type: raftpb.MsgApp, Term: 1, LogTerm: 1},
streamApp,
streamAppV2,
},
{
true, true,
@ -75,9 +75,9 @@ func TestPeerPick(t *testing.T) {
}
for i, tt := range tests {
peer := &peer{
msgAppWriter: &streamWriter{working: tt.msgappWorking},
writer: &streamWriter{working: tt.messageWorking},
pipeline: &pipeline{},
msgAppV2Writer: &streamWriter{working: tt.msgappWorking},
writer: &streamWriter{working: tt.messageWorking},
pipeline: &pipeline{},
}
_, picked := peer.pick(tt.m)
if picked != tt.wpicked {

View File

@ -21,7 +21,6 @@ import (
"net"
"net/http"
"path"
"strconv"
"strings"
"sync"
"time"
@ -37,7 +36,6 @@ import (
const (
streamTypeMessage streamType = "message"
streamTypeMsgAppV2 streamType = "msgappv2"
streamTypeMsgApp streamType = "msgapp"
streamBufSize = 4096
)
@ -47,9 +45,9 @@ var (
// the key is in string format "major.minor.patch"
supportedStream = map[string][]streamType{
"2.0.0": {streamTypeMsgApp},
"2.1.0": {streamTypeMsgApp, streamTypeMsgAppV2, streamTypeMessage},
"2.2.0": {streamTypeMsgApp, streamTypeMsgAppV2, streamTypeMessage},
"2.0.0": {},
"2.1.0": {streamTypeMsgAppV2, streamTypeMessage},
"2.2.0": {streamTypeMsgAppV2, streamTypeMessage},
}
)
@ -57,8 +55,6 @@ type streamType string
func (t streamType) endpoint() string {
switch t {
case streamTypeMsgApp: // for backward compatibility of v2.0
return RaftStreamPrefix
case streamTypeMsgAppV2:
return path.Join(RaftStreamPrefix, "msgapp")
case streamTypeMessage:
@ -71,8 +67,6 @@ func (t streamType) endpoint() string {
func (t streamType) String() string {
switch t {
case streamTypeMsgApp:
return "stream MsgApp"
case streamTypeMsgAppV2:
return "stream MsgApp v2"
case streamTypeMessage:
@ -94,8 +88,7 @@ func isLinkHeartbeatMessage(m raftpb.Message) bool {
}
type outgoingConn struct {
t streamType
termStr string
t streamType
io.Writer
http.Flusher
io.Closer
@ -138,7 +131,6 @@ func (cw *streamWriter) run() {
var msgc chan raftpb.Message
var heartbeatc <-chan time.Time
var t streamType
var msgAppTerm uint64
var enc encoder
var flusher http.Flusher
tickc := time.Tick(ConnReadTimeout / 3)
@ -158,16 +150,6 @@ func (cw *streamWriter) run() {
flusher.Flush()
reportSentDuration(string(t), linkHeartbeatMessage, time.Since(start))
case m := <-msgc:
if t == streamTypeMsgApp && m.Term != msgAppTerm {
// TODO: reasonable retry logic
if m.Term > msgAppTerm {
cw.close()
heartbeatc, msgc = nil, nil
// TODO: report to raft at peer level
cw.r.ReportUnreachable(m.To)
}
continue
}
start := time.Now()
if err := enc.encode(m); err != nil {
reportSentFailure(string(t), m)
@ -184,13 +166,6 @@ func (cw *streamWriter) run() {
cw.close()
t = conn.t
switch conn.t {
case streamTypeMsgApp:
var err error
msgAppTerm, err = strconv.ParseUint(conn.termStr, 10, 64)
if err != nil {
plog.Panicf("could not parse term %s to uint (%v)", conn.termStr, err)
}
enc = &msgAppEncoder{w: conn.Writer, fs: cw.fs}
case streamTypeMsgAppV2:
enc = newMsgAppV2Encoder(conn.Writer, cw.fs)
case streamTypeMessage:
@ -260,29 +235,27 @@ type streamReader struct {
propc chan<- raftpb.Message
errorc chan<- error
mu sync.Mutex
msgAppTerm uint64
cancel func()
closer io.Closer
stopc chan struct{}
done chan struct{}
mu sync.Mutex
cancel func()
closer io.Closer
stopc chan struct{}
done chan struct{}
}
func startStreamReader(tr http.RoundTripper, picker *urlPicker, t streamType, local, remote, cid types.ID, status *peerStatus, recvc chan<- raftpb.Message, propc chan<- raftpb.Message, errorc chan<- error, term uint64) *streamReader {
func startStreamReader(tr http.RoundTripper, picker *urlPicker, t streamType, local, remote, cid types.ID, status *peerStatus, recvc chan<- raftpb.Message, propc chan<- raftpb.Message, errorc chan<- error) *streamReader {
r := &streamReader{
tr: tr,
picker: picker,
t: t,
local: local,
remote: remote,
cid: cid,
status: status,
recvc: recvc,
propc: propc,
errorc: errorc,
msgAppTerm: term,
stopc: make(chan struct{}),
done: make(chan struct{}),
tr: tr,
picker: picker,
t: t,
local: local,
remote: remote,
cid: cid,
status: status,
recvc: recvc,
propc: propc,
errorc: errorc,
stopc: make(chan struct{}),
done: make(chan struct{}),
}
go r.run()
return r
@ -292,12 +265,6 @@ func (cr *streamReader) run() {
for {
t := cr.t
rc, err := cr.dial(t)
// downgrade to streamTypeMsgApp if the remote doesn't support
// streamTypeMsgAppV2
if t == streamTypeMsgAppV2 && err == errUnsupportedStreamType {
t = streamTypeMsgApp
rc, err = cr.dial(t)
}
if err != nil {
if err != errUnsupportedStreamType {
cr.status.deactivate(failureType{source: t.String(), action: "dial"}, err.Error())
@ -310,9 +277,6 @@ func (cr *streamReader) run() {
case err == io.EOF:
// connection is closed by the remote
case isClosedConnectionError(err):
// stream msgapp is only used for etcd 2.0, and etcd 2.0 doesn't
// heartbeat on the idle stream, so it is expected to time out.
case t == streamTypeMsgApp && isNetworkTimeoutError(err):
default:
cr.status.deactivate(failureType{source: t.String(), action: "read"}, err.Error())
}
@ -332,8 +296,6 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
var dec decoder
cr.mu.Lock()
switch t {
case streamTypeMsgApp:
dec = &msgAppDecoder{r: rc, local: cr.local, remote: cr.remote, term: cr.msgAppTerm}
case streamTypeMsgAppV2:
dec = newMsgAppV2Decoder(rc, cr.local, cr.remote)
case streamTypeMessage:
@ -377,20 +339,6 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
}
}
// updateMsgAppTerm updates the term for MsgApp stream, and closes
// the existing MsgApp stream if term is updated.
func (cr *streamReader) updateMsgAppTerm(term uint64) {
cr.mu.Lock()
defer cr.mu.Unlock()
if cr.msgAppTerm >= term {
return
}
cr.msgAppTerm = term
if cr.t == streamTypeMsgApp {
cr.close()
}
}
func (cr *streamReader) stop() {
close(cr.stopc)
cr.mu.Lock()
@ -410,10 +358,6 @@ func (cr *streamReader) isWorking() bool {
func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
u := cr.picker.pick()
cr.mu.Lock()
term := cr.msgAppTerm
cr.mu.Unlock()
uu := u
uu.Path = path.Join(t.endpoint(), cr.local.String())
@ -427,9 +371,6 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
req.Header.Set("X-Min-Cluster-Version", version.MinClusterVersion)
req.Header.Set("X-Etcd-Cluster-ID", cr.cid.String())
req.Header.Set("X-Raft-To", cr.remote.String())
if t == streamTypeMsgApp {
req.Header.Set("X-Raft-Term", strconv.FormatUint(term, 10))
}
cr.mu.Lock()
select {
@ -500,10 +441,6 @@ func (cr *streamReader) close() {
cr.closer = nil
}
func canUseMsgAppStream(m raftpb.Message) bool {
return m.Type == raftpb.MsgApp && m.Term == m.LogTerm
}
func isClosedConnectionError(err error) bool {
operr, ok := err.(*net.OpError)
return ok && operr.Err.Error() == "use of closed network connection"

View File

@ -98,15 +98,14 @@ func TestStreamWriterAttachBadOutgoingConn(t *testing.T) {
}
func TestStreamReaderDialRequest(t *testing.T) {
for i, tt := range []streamType{streamTypeMsgApp, streamTypeMessage, streamTypeMsgAppV2} {
for i, tt := range []streamType{streamTypeMessage, streamTypeMsgAppV2} {
tr := &roundTripperRecorder{}
sr := &streamReader{
tr: tr,
picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
local: types.ID(1),
remote: types.ID(2),
cid: types.ID(1),
msgAppTerm: 1,
tr: tr,
picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
local: types.ID(1),
remote: types.ID(2),
cid: types.ID(1),
}
sr.dial(tt)
@ -124,9 +123,6 @@ func TestStreamReaderDialRequest(t *testing.T) {
if g := req.Header.Get("X-Raft-To"); g != "2" {
t.Errorf("#%d: header X-Raft-To = %s, want 2", i, g)
}
if g := req.Header.Get("X-Raft-Term"); tt == streamTypeMsgApp && g != "1" {
t.Errorf("#%d: header X-Raft-Term = %s, want 1", i, g)
}
}
}
@ -173,41 +169,6 @@ func TestStreamReaderDialResult(t *testing.T) {
}
}
func TestStreamReaderUpdateMsgAppTerm(t *testing.T) {
term := uint64(2)
tests := []struct {
term uint64
typ streamType
wterm uint64
wclose bool
}{
// lower term
{1, streamTypeMsgApp, 2, false},
// unchanged term
{2, streamTypeMsgApp, 2, false},
// higher term
{3, streamTypeMessage, 3, false},
{3, streamTypeMsgAppV2, 3, false},
// higher term, reset closer
{3, streamTypeMsgApp, 3, true},
}
for i, tt := range tests {
closer := &fakeWriteFlushCloser{}
cr := &streamReader{
msgAppTerm: term,
t: tt.typ,
closer: closer,
}
cr.updateMsgAppTerm(tt.term)
if cr.msgAppTerm != tt.wterm {
t.Errorf("#%d: term = %d, want %d", i, cr.msgAppTerm, tt.wterm)
}
if closer.closed != tt.wclose {
t.Errorf("#%d: closed = %v, want %v", i, closer.closed, tt.wclose)
}
}
}
// TestStreamReaderDialDetectUnsupport tests that dial func could find
// out that the stream type is not supported by the remote.
func TestStreamReaderDialDetectUnsupport(t *testing.T) {
@ -248,32 +209,22 @@ func TestStream(t *testing.T) {
}
tests := []struct {
t streamType
term uint64
m raftpb.Message
wc chan raftpb.Message
t streamType
m raftpb.Message
wc chan raftpb.Message
}{
{
streamTypeMessage,
0,
raftpb.Message{Type: raftpb.MsgProp, To: 2},
propc,
},
{
streamTypeMessage,
0,
msgapp,
recvc,
},
{
streamTypeMsgApp,
1,
msgapp,
recvc,
},
{
streamTypeMsgAppV2,
0,
msgapp,
recvc,
},
@ -288,7 +239,7 @@ func TestStream(t *testing.T) {
h.sw = sw
picker := mustNewURLPicker(t, []string{srv.URL})
sr := startStreamReader(&http.Transport{}, picker, tt.t, types.ID(1), types.ID(2), types.ID(1), newPeerStatus(types.ID(1)), recvc, propc, nil, tt.term)
sr := startStreamReader(&http.Transport{}, picker, tt.t, types.ID(1), types.ID(2), types.ID(1), newPeerStatus(types.ID(1)), recvc, propc, nil)
defer sr.stop()
// wait for stream to work
var writec chan<- raftpb.Message
@ -321,27 +272,21 @@ func TestCheckStreamSupport(t *testing.T) {
}{
// support
{
semver.Must(semver.NewVersion("2.0.0")),
streamTypeMsgApp,
semver.Must(semver.NewVersion("2.1.0")),
streamTypeMsgAppV2,
true,
},
// ignore patch
{
semver.Must(semver.NewVersion("2.0.9")),
streamTypeMsgApp,
semver.Must(semver.NewVersion("2.1.9")),
streamTypeMsgAppV2,
true,
},
// ignore prerelease
{
semver.Must(semver.NewVersion("2.0.0-alpha")),
streamTypeMsgApp,
true,
},
// not support
{
semver.Must(semver.NewVersion("2.0.0")),
semver.Must(semver.NewVersion("2.1.0-alpha")),
streamTypeMsgAppV2,
false,
true,
},
}
for i, tt := range tests {
@ -378,7 +323,6 @@ func (h *fakeStreamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
c := newCloseNotifier()
h.sw.attach(&outgoingConn{
t: h.t,
termStr: r.Header.Get("X-Raft-Term"),
Writer: w,
Flusher: w.(http.Flusher),
Closer: c,

View File

@ -125,8 +125,7 @@ type Transport struct {
streamRt http.RoundTripper // roundTripper used by streams
pipelineRt http.RoundTripper // roundTripper used by pipelines
mu sync.RWMutex // protect the term, remote and peer map
term uint64 // the latest term that has been observed
mu sync.RWMutex // protect the remote and peer map
remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up
peers map[types.ID]Peer // peers map
@ -173,18 +172,6 @@ func (t *Transport) Get(id types.ID) Peer {
return t.peers[id]
}
func (t *Transport) maybeUpdatePeersTerm(term uint64) {
t.mu.Lock()
defer t.mu.Unlock()
if t.term >= term {
return
}
t.term = term
for _, p := range t.peers {
p.setTerm(term)
}
}
func (t *Transport) Send(msgs []raftpb.Message) {
for _, m := range msgs {
if m.To == 0 {
@ -193,12 +180,6 @@ func (t *Transport) Send(msgs []raftpb.Message) {
}
to := types.ID(m.To)
// update terms for all the peers
// ignore MsgProp since it does not have a valid term
if m.Type != raftpb.MsgProp {
t.maybeUpdatePeersTerm(m.Term)
}
p, ok := t.peers[to]
if ok {
if m.Type == raftpb.MsgApp {
@ -258,7 +239,7 @@ func (t *Transport) AddPeer(id types.ID, us []string) {
plog.Panicf("newURLs %+v should never fail: %+v", us, err)
}
fs := t.LeaderStats.Follower(id.String())
t.peers[id] = startPeer(t.streamRt, t.pipelineRt, urls, t.ID, id, t.ClusterID, t.snapst, t.Raft, fs, t.ErrorC, t.term, t.V3demo)
t.peers[id] = startPeer(t.streamRt, t.pipelineRt, urls, t.ID, id, t.ClusterID, t.snapst, t.Raft, fs, t.ErrorC, t.V3demo)
addPeerToProber(t.prober, id.String(), us)
}

View File

@ -68,11 +68,9 @@ func TestTransportSend(t *testing.T) {
func TestTransportAdd(t *testing.T) {
ls := stats.NewLeaderStats("")
term := uint64(10)
tr := &Transport{
LeaderStats: ls,
streamRt: &roundTripperRecorder{},
term: term,
peers: make(map[types.ID]Peer),
prober: probing.NewProber(nil),
}
@ -95,10 +93,6 @@ func TestTransportAdd(t *testing.T) {
}
tr.Stop()
if g := s.(*peer).msgAppReader.msgAppTerm; g != term {
t.Errorf("peer.term = %d, want %d", g, term)
}
}
func TestTransportRemove(t *testing.T) {