From fb4b0b5cf075212f79fcdfa58533915a29c0912d Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Sat, 27 Jun 2015 23:21:19 -0700 Subject: [PATCH] rafthttp: support to init term when adding peer So it doesn't need to build term-0 stream with the remote first, then update it. --- rafthttp/peer.go | 6 +++--- rafthttp/stream.go | 27 ++++++++++++++------------- rafthttp/stream_test.go | 2 +- rafthttp/transport.go | 4 +--- 4 files changed, 19 insertions(+), 20 deletions(-) diff --git a/rafthttp/peer.go b/rafthttp/peer.go index 8509ac5df..95ab2db4b 100644 --- a/rafthttp/peer.go +++ b/rafthttp/peer.go @@ -107,7 +107,7 @@ type peer struct { done chan struct{} } -func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error) *peer { +func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error, term uint64) *peer { picker := newURLPicker(urls) status := newPeerStatus(to) p := &peer{ @@ -143,8 +143,8 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r } }() - p.msgAppReader = startStreamReader(tr, picker, streamTypeMsgAppV2, local, to, cid, status, p.recvc, p.propc, errorc) - reader := startStreamReader(tr, picker, streamTypeMessage, local, to, cid, status, p.recvc, p.propc, errorc) + p.msgAppReader = startStreamReader(tr, picker, streamTypeMsgAppV2, local, to, cid, status, p.recvc, p.propc, errorc, term) + reader := startStreamReader(tr, picker, streamTypeMessage, local, to, cid, status, p.recvc, p.propc, errorc, term) go func() { var paused bool for { diff --git a/rafthttp/stream.go b/rafthttp/stream.go index 470605308..9f979f358 100644 --- a/rafthttp/stream.go +++ b/rafthttp/stream.go @@ -266,20 +266,21 @@ type streamReader struct { done chan struct{} } -func startStreamReader(tr http.RoundTripper, picker *urlPicker, t streamType, local, remote, cid types.ID, status *peerStatus, recvc chan<- raftpb.Message, propc chan<- raftpb.Message, errorc chan<- error) *streamReader { +func startStreamReader(tr http.RoundTripper, picker *urlPicker, t streamType, local, remote, cid types.ID, status *peerStatus, recvc chan<- raftpb.Message, propc chan<- raftpb.Message, errorc chan<- error, term uint64) *streamReader { r := &streamReader{ - tr: tr, - picker: picker, - t: t, - local: local, - remote: remote, - cid: cid, - status: status, - recvc: recvc, - propc: propc, - errorc: errorc, - stopc: make(chan struct{}), - done: make(chan struct{}), + tr: tr, + picker: picker, + t: t, + local: local, + remote: remote, + cid: cid, + status: status, + recvc: recvc, + propc: propc, + errorc: errorc, + msgAppTerm: term, + stopc: make(chan struct{}), + done: make(chan struct{}), } go r.run() return r diff --git a/rafthttp/stream_test.go b/rafthttp/stream_test.go index cb892a4a5..48de09ab0 100644 --- a/rafthttp/stream_test.go +++ b/rafthttp/stream_test.go @@ -274,7 +274,7 @@ func TestStream(t *testing.T) { h.sw = sw picker := mustNewURLPicker(t, []string{srv.URL}) - sr := startStreamReader(&http.Transport{}, picker, tt.t, types.ID(1), types.ID(2), types.ID(1), newPeerStatus(types.ID(1)), recvc, propc, nil) + sr := startStreamReader(&http.Transport{}, picker, tt.t, types.ID(1), types.ID(2), types.ID(1), newPeerStatus(types.ID(1)), recvc, propc, nil, 1) defer sr.stop() if tt.t == streamTypeMsgApp { sr.updateMsgAppTerm(tt.term) diff --git a/rafthttp/transport.go b/rafthttp/transport.go index 3558004b3..d3b9607fa 100644 --- a/rafthttp/transport.go +++ b/rafthttp/transport.go @@ -194,9 +194,7 @@ func (t *transport) AddPeer(id types.ID, us []string) { plog.Panicf("newURLs %+v should never fail: %+v", us, err) } fs := t.leaderStats.Follower(id.String()) - p := startPeer(t.roundTripper, urls, t.id, id, t.clusterID, t.raft, fs, t.errorc) - p.setTerm(t.term) - t.peers[id] = p + t.peers[id] = startPeer(t.roundTripper, urls, t.id, id, t.clusterID, t.raft, fs, t.errorc, t.term) } func (t *transport) RemovePeer(id types.ID) {