From c16cc3a6a36436d8c8049172ed0e2d8fc7977ebf Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Thu, 12 Feb 2015 10:48:06 -0800 Subject: [PATCH] etcdserver: recover transport when recovering from a snapshot --- etcdserver/server.go | 10 ++++++++++ etcdserver/server_test.go | 1 + rafthttp/transport.go | 14 ++++++++++++++ 3 files changed, 25 insertions(+) diff --git a/etcdserver/server.go b/etcdserver/server.go index c0fc2ffcc..0249506f7 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -392,6 +392,16 @@ func (s *EtcdServer) run() { log.Panicf("recovery store error: %v", err) } s.Cluster.Recover() + + // recover raft transport + s.r.transport.RemoveAllPeers() + for _, m := range s.Cluster.Members() { + if m.ID == s.ID() { + continue + } + s.r.transport.AddPeer(m.ID, m.PeerURLs) + } + appliedi = rd.Snapshot.Metadata.Index confState = rd.Snapshot.Metadata.ConfState log.Printf("etcdserver: recovered from incoming snapshot at index %d", snapi) diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 50dd76a0a..c7a8308ac 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -1393,6 +1393,7 @@ func (s *nopTransporter) Handler() http.Handler { return nil } func (s *nopTransporter) Send(m []raftpb.Message) {} func (s *nopTransporter) AddPeer(id types.ID, us []string) {} func (s *nopTransporter) RemovePeer(id types.ID) {} +func (s *nopTransporter) RemoveAllPeers() {} func (s *nopTransporter) UpdatePeer(id types.ID, us []string) {} func (s *nopTransporter) Stop() {} func (s *nopTransporter) Pause() {} diff --git a/rafthttp/transport.go b/rafthttp/transport.go index dc3e25c0c..3196f0bda 100644 --- a/rafthttp/transport.go +++ b/rafthttp/transport.go @@ -37,6 +37,7 @@ type Transporter interface { Send(m []raftpb.Message) AddPeer(id types.ID, urls []string) RemovePeer(id types.ID) + RemoveAllPeers() UpdatePeer(id types.ID, urls []string) Stop() } @@ -132,6 +133,19 @@ func (t *transport) AddPeer(id types.ID, urls []string) { func (t *transport) RemovePeer(id types.ID) { t.mu.Lock() defer t.mu.Unlock() + t.removePeer(id) +} + +func (t *transport) RemoveAllPeers() { + t.mu.Lock() + defer t.mu.Unlock() + for id, _ := range t.peers { + t.removePeer(id) + } +} + +// the caller of this function must have the peers mutex. +func (t *transport) removePeer(id types.ID) { if peer, ok := t.peers[id]; ok { peer.Stop() } else {