From cff005777a40bcf3a5bea3e87387273afe054ce1 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Fri, 20 Feb 2015 13:51:38 -0800 Subject: [PATCH] etcdserver: fix cluster fallback recovery Cluster and transport may recover to old states when new node joins the cluster. Record cluster last modified index to avoid this. --- etcdmain/etcd.go | 2 +- etcdserver/cluster.go | 8 ++++++++ etcdserver/cluster_util.go | 17 ++++++++++++++++- etcdserver/etcdhttp/peer.go | 6 +++++- etcdserver/etcdhttp/peer_test.go | 4 ++-- etcdserver/server.go | 19 ++++++++++++------- integration/cluster_test.go | 2 +- 7 files changed, 45 insertions(+), 13 deletions(-) diff --git a/etcdmain/etcd.go b/etcdmain/etcd.go index 8a58768d5..904c456f3 100644 --- a/etcdmain/etcd.go +++ b/etcdmain/etcd.go @@ -173,7 +173,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) { Handler: etcdhttp.NewClientHandler(s), Info: cfg.corsInfo, } - ph := etcdhttp.NewPeerHandler(s.Cluster, s.RaftHandler()) + ph := etcdhttp.NewPeerHandler(s.Cluster, etcdserver.RaftTimer(s), s.RaftHandler()) // Start the peer server in a goroutine for _, l := range plns { go func(l net.Listener) { diff --git a/etcdserver/cluster.go b/etcdserver/cluster.go index 908aeed29..1b94342e0 100644 --- a/etcdserver/cluster.go +++ b/etcdserver/cluster.go @@ -59,6 +59,12 @@ type Cluster struct { id types.ID token string store store.Store + // index is the raft index that cluster is updated at bootstrap + // from remote cluster info. + // It may have a higher value than local raft index, because it + // displays a further view of the cluster. + // TODO: upgrade it as last modified index + index uint64 sync.Mutex // guards members and removed map members map[types.ID]*Member @@ -230,6 +236,8 @@ func (c *Cluster) SetID(id types.ID) { c.id = id } func (c *Cluster) SetStore(st store.Store) { c.store = st } +func (c *Cluster) UpdateIndex(index uint64) { c.index = index } + func (c *Cluster) Recover() { c.members, c.removed = membersFromStore(c.store) } diff --git a/etcdserver/cluster_util.go b/etcdserver/cluster_util.go index 9cc0a72e2..1792769ef 100644 --- a/etcdserver/cluster_util.go +++ b/etcdserver/cluster_util.go @@ -21,6 +21,7 @@ import ( "log" "net/http" "sort" + "strconv" "time" "github.com/coreos/etcd/pkg/types" @@ -88,7 +89,21 @@ func getClusterFromRemotePeers(urls []string, logerr bool, tr *http.Transport) ( } continue } - return NewClusterFromMembers("", id, membs), nil + var index uint64 + // The header at or before v2.0.3 doesn't have this field. For backward + // compatibility, it checks whether the field exists. + if indexStr := resp.Header.Get("X-Raft-Index"); indexStr != "" { + index, err = strconv.ParseUint(indexStr, 10, 64) + if err != nil { + if logerr { + log.Printf("etcdserver: could not parse raft index: %v", err) + } + continue + } + } + cl := NewClusterFromMembers("", id, membs) + cl.UpdateIndex(index) + return cl, nil } return nil, fmt.Errorf("etcdserver: could not retrieve cluster information from the given urls") } diff --git a/etcdserver/etcdhttp/peer.go b/etcdserver/etcdhttp/peer.go index 23d7471ff..0b2b47707 100644 --- a/etcdserver/etcdhttp/peer.go +++ b/etcdserver/etcdhttp/peer.go @@ -18,6 +18,7 @@ import ( "encoding/json" "log" "net/http" + "strconv" "github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/rafthttp" @@ -28,9 +29,10 @@ const ( ) // NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests. -func NewPeerHandler(clusterInfo etcdserver.ClusterInfo, raftHandler http.Handler) http.Handler { +func NewPeerHandler(clusterInfo etcdserver.ClusterInfo, timer etcdserver.RaftTimer, raftHandler http.Handler) http.Handler { mh := &peerMembersHandler{ clusterInfo: clusterInfo, + timer: timer, } mux := http.NewServeMux() @@ -43,6 +45,7 @@ func NewPeerHandler(clusterInfo etcdserver.ClusterInfo, raftHandler http.Handler type peerMembersHandler struct { clusterInfo etcdserver.ClusterInfo + timer etcdserver.RaftTimer } func (h *peerMembersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -50,6 +53,7 @@ func (h *peerMembersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } w.Header().Set("X-Etcd-Cluster-ID", h.clusterInfo.ID().String()) + w.Header().Set("X-Raft-Index", strconv.FormatUint(h.timer.Index(), 10)) if r.URL.Path != peerMembersPrefix { http.Error(w, "bad path", http.StatusBadRequest) diff --git a/etcdserver/etcdhttp/peer_test.go b/etcdserver/etcdhttp/peer_test.go index cff85f35b..850dd6677 100644 --- a/etcdserver/etcdhttp/peer_test.go +++ b/etcdserver/etcdhttp/peer_test.go @@ -33,7 +33,7 @@ func TestNewPeerHandlerOnRaftPrefix(t *testing.T) { h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Write([]byte("test data")) }) - ph := NewPeerHandler(&fakeCluster{}, h) + ph := NewPeerHandler(&fakeCluster{}, &dummyRaftTimer{}, h) srv := httptest.NewServer(ph) defer srv.Close() @@ -91,7 +91,7 @@ func TestServeMembersGet(t *testing.T) { id: 1, members: map[uint64]*etcdserver.Member{1: &memb1, 2: &memb2}, } - h := &peerMembersHandler{clusterInfo: cluster} + h := &peerMembersHandler{clusterInfo: cluster, timer: &dummyRaftTimer{}} msb, err := json.Marshal([]etcdserver.Member{memb1, memb2}) if err != nil { t.Fatal(err) diff --git a/etcdserver/server.go b/etcdserver/server.go index c2b886701..bfd663a9f 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -164,6 +164,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { if err := ValidateClusterAndAssignIDs(cfg.Cluster, existingCluster); err != nil { return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err) } + cfg.Cluster.UpdateIndex(existingCluster.index) cfg.Cluster.SetID(existingCluster.id) cfg.Cluster.SetStore(st) cfg.Print() @@ -393,15 +394,19 @@ func (s *EtcdServer) run() { if err := s.store.Recovery(rd.Snapshot.Data); err != nil { log.Panicf("recovery store error: %v", err) } - s.Cluster.Recover() - // recover raft transport - s.r.transport.RemoveAllPeers() - for _, m := range s.Cluster.Members() { - if m.ID == s.ID() { - continue + // It avoids snapshot recovery overwriting newer cluster and + // transport setting, which may block the communication. + if s.Cluster.index < rd.Snapshot.Metadata.Index { + s.Cluster.Recover() + // recover raft transport + s.r.transport.RemoveAllPeers() + for _, m := range s.Cluster.Members() { + if m.ID == s.ID() { + continue + } + s.r.transport.AddPeer(m.ID, m.PeerURLs) } - s.r.transport.AddPeer(m.ID, m.PeerURLs) } appliedi = rd.Snapshot.Metadata.Index diff --git a/integration/cluster_test.go b/integration/cluster_test.go index 4f5e7db83..b7d7c7715 100644 --- a/integration/cluster_test.go +++ b/integration/cluster_test.go @@ -526,7 +526,7 @@ func (m *member) Launch() error { m.s.SyncTicker = time.Tick(500 * time.Millisecond) m.s.Start() - m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s.Cluster, m.s.RaftHandler())} + m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s.Cluster, m.s, m.s.RaftHandler())} for _, ln := range m.PeerListeners { hs := &httptest.Server{