From e25e36832123606c8687f50fec2d818767f6dd4f Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Mon, 22 Jun 2015 20:49:55 -0700 Subject: [PATCH] rafthttp: update term when AddPeer Update term when AddPeer, or the term in peer will not be updated until the term is changed. This fixes the log flood happended when the v2.1 follower applies the snapshot from v2.0 leader: ``` rafthttp: cannot attach out of data stream server [0 / 17] ``` or ``` rafthttp: server streaming to 6e3bd23ae5f1eae0 at term 0 has been stopped ``` --- rafthttp/transport.go | 8 ++++++-- rafthttp/transport_test.go | 10 +++++++++- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/rafthttp/transport.go b/rafthttp/transport.go index 8c7288061..3558004b3 100644 --- a/rafthttp/transport.go +++ b/rafthttp/transport.go @@ -79,8 +79,8 @@ type transport struct { serverStats *stats.ServerStats leaderStats *stats.LeaderStats + mu sync.RWMutex // protect the term, remote and peer map term uint64 // the latest term that has been observed - mu sync.RWMutex // protect the remote and peer map remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up peers map[types.ID]Peer // peers map errorc chan error @@ -116,6 +116,8 @@ func (t *transport) Get(id types.ID) Peer { } func (t *transport) maybeUpdatePeersTerm(term uint64) { + t.mu.Lock() + defer t.mu.Unlock() if t.term >= term { return } @@ -192,7 +194,9 @@ func (t *transport) AddPeer(id types.ID, us []string) { plog.Panicf("newURLs %+v should never fail: %+v", us, err) } fs := t.leaderStats.Follower(id.String()) - t.peers[id] = startPeer(t.roundTripper, urls, t.id, id, t.clusterID, t.raft, fs, t.errorc) + p := startPeer(t.roundTripper, urls, t.id, id, t.clusterID, t.raft, fs, t.errorc) + p.setTerm(t.term) + t.peers[id] = p } func (t *transport) RemovePeer(id types.ID) { diff --git a/rafthttp/transport_test.go b/rafthttp/transport_test.go index 61043c69b..6c4a3a37c 100644 --- a/rafthttp/transport_test.go +++ b/rafthttp/transport_test.go @@ -67,19 +67,21 @@ func TestTransportSend(t *testing.T) { func TestTransportAdd(t *testing.T) { ls := stats.NewLeaderStats("") + term := uint64(10) tr := &transport{ roundTripper: &roundTripperRecorder{}, leaderStats: ls, + term: term, peers: make(map[types.ID]Peer), } tr.AddPeer(1, []string{"http://localhost:2380"}) - defer tr.Stop() if _, ok := ls.Followers["1"]; !ok { t.Errorf("FollowerStats[1] is nil, want exists") } s, ok := tr.peers[types.ID(1)] if !ok { + tr.Stop() t.Fatalf("senders[1] is nil, want exists") } @@ -89,6 +91,12 @@ func TestTransportAdd(t *testing.T) { if s != ns { t.Errorf("sender = %v, want %v", ns, s) } + + tr.Stop() + + if g := s.(*peer).msgAppReader.msgAppTerm; g != term { + t.Errorf("peer.term = %d, want %d", g, term) + } } func TestTransportRemove(t *testing.T) {