etcdserver: fix cluster fallback recovery

Cluster and transport may recover to old states when new node joins
the cluster. Record cluster last modified index to avoid this.
This commit is contained in:
Yicheng Qin 2015-02-20 13:51:38 -08:00
parent 0ea2173a7e
commit cff005777a
7 changed files with 45 additions and 13 deletions

View File

@ -173,7 +173,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
Handler: etcdhttp.NewClientHandler(s),
Info: cfg.corsInfo,
}
ph := etcdhttp.NewPeerHandler(s.Cluster, s.RaftHandler())
ph := etcdhttp.NewPeerHandler(s.Cluster, etcdserver.RaftTimer(s), s.RaftHandler())
// Start the peer server in a goroutine
for _, l := range plns {
go func(l net.Listener) {

View File

@ -59,6 +59,12 @@ type Cluster struct {
id types.ID
token string
store store.Store
// index is the raft index that cluster is updated at bootstrap
// from remote cluster info.
// It may have a higher value than local raft index, because it
// displays a further view of the cluster.
// TODO: upgrade it as last modified index
index uint64
sync.Mutex // guards members and removed map
members map[types.ID]*Member
@ -230,6 +236,8 @@ func (c *Cluster) SetID(id types.ID) { c.id = id }
func (c *Cluster) SetStore(st store.Store) { c.store = st }
func (c *Cluster) UpdateIndex(index uint64) { c.index = index }
func (c *Cluster) Recover() {
c.members, c.removed = membersFromStore(c.store)
}

View File

@ -21,6 +21,7 @@ import (
"log"
"net/http"
"sort"
"strconv"
"time"
"github.com/coreos/etcd/pkg/types"
@ -88,7 +89,21 @@ func getClusterFromRemotePeers(urls []string, logerr bool, tr *http.Transport) (
}
continue
}
return NewClusterFromMembers("", id, membs), nil
var index uint64
// The header at or before v2.0.3 doesn't have this field. For backward
// compatibility, it checks whether the field exists.
if indexStr := resp.Header.Get("X-Raft-Index"); indexStr != "" {
index, err = strconv.ParseUint(indexStr, 10, 64)
if err != nil {
if logerr {
log.Printf("etcdserver: could not parse raft index: %v", err)
}
continue
}
}
cl := NewClusterFromMembers("", id, membs)
cl.UpdateIndex(index)
return cl, nil
}
return nil, fmt.Errorf("etcdserver: could not retrieve cluster information from the given urls")
}

View File

@ -18,6 +18,7 @@ import (
"encoding/json"
"log"
"net/http"
"strconv"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/rafthttp"
@ -28,9 +29,10 @@ const (
)
// NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests.
func NewPeerHandler(clusterInfo etcdserver.ClusterInfo, raftHandler http.Handler) http.Handler {
func NewPeerHandler(clusterInfo etcdserver.ClusterInfo, timer etcdserver.RaftTimer, raftHandler http.Handler) http.Handler {
mh := &peerMembersHandler{
clusterInfo: clusterInfo,
timer: timer,
}
mux := http.NewServeMux()
@ -43,6 +45,7 @@ func NewPeerHandler(clusterInfo etcdserver.ClusterInfo, raftHandler http.Handler
type peerMembersHandler struct {
clusterInfo etcdserver.ClusterInfo
timer etcdserver.RaftTimer
}
func (h *peerMembersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@ -50,6 +53,7 @@ func (h *peerMembersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
w.Header().Set("X-Etcd-Cluster-ID", h.clusterInfo.ID().String())
w.Header().Set("X-Raft-Index", strconv.FormatUint(h.timer.Index(), 10))
if r.URL.Path != peerMembersPrefix {
http.Error(w, "bad path", http.StatusBadRequest)

View File

@ -33,7 +33,7 @@ func TestNewPeerHandlerOnRaftPrefix(t *testing.T) {
h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("test data"))
})
ph := NewPeerHandler(&fakeCluster{}, h)
ph := NewPeerHandler(&fakeCluster{}, &dummyRaftTimer{}, h)
srv := httptest.NewServer(ph)
defer srv.Close()
@ -91,7 +91,7 @@ func TestServeMembersGet(t *testing.T) {
id: 1,
members: map[uint64]*etcdserver.Member{1: &memb1, 2: &memb2},
}
h := &peerMembersHandler{clusterInfo: cluster}
h := &peerMembersHandler{clusterInfo: cluster, timer: &dummyRaftTimer{}}
msb, err := json.Marshal([]etcdserver.Member{memb1, memb2})
if err != nil {
t.Fatal(err)

View File

@ -164,6 +164,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
if err := ValidateClusterAndAssignIDs(cfg.Cluster, existingCluster); err != nil {
return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err)
}
cfg.Cluster.UpdateIndex(existingCluster.index)
cfg.Cluster.SetID(existingCluster.id)
cfg.Cluster.SetStore(st)
cfg.Print()
@ -393,15 +394,19 @@ func (s *EtcdServer) run() {
if err := s.store.Recovery(rd.Snapshot.Data); err != nil {
log.Panicf("recovery store error: %v", err)
}
s.Cluster.Recover()
// recover raft transport
s.r.transport.RemoveAllPeers()
for _, m := range s.Cluster.Members() {
if m.ID == s.ID() {
continue
// It avoids snapshot recovery overwriting newer cluster and
// transport setting, which may block the communication.
if s.Cluster.index < rd.Snapshot.Metadata.Index {
s.Cluster.Recover()
// recover raft transport
s.r.transport.RemoveAllPeers()
for _, m := range s.Cluster.Members() {
if m.ID == s.ID() {
continue
}
s.r.transport.AddPeer(m.ID, m.PeerURLs)
}
s.r.transport.AddPeer(m.ID, m.PeerURLs)
}
appliedi = rd.Snapshot.Metadata.Index

View File

@ -526,7 +526,7 @@ func (m *member) Launch() error {
m.s.SyncTicker = time.Tick(500 * time.Millisecond)
m.s.Start()
m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s.Cluster, m.s.RaftHandler())}
m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s.Cluster, m.s, m.s.RaftHandler())}
for _, ln := range m.PeerListeners {
hs := &httptest.Server{