Merge pull request #2348 from yichengq/326

etcdserver: fix cluster fallback recovery
This commit is contained in:
Xiang Li 2015-02-21 12:16:08 -08:00
commit 4d0472029a
7 changed files with 45 additions and 13 deletions

View File

@ -173,7 +173,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
Handler: etcdhttp.NewClientHandler(s), Handler: etcdhttp.NewClientHandler(s),
Info: cfg.corsInfo, Info: cfg.corsInfo,
} }
ph := etcdhttp.NewPeerHandler(s.Cluster, s.RaftHandler()) ph := etcdhttp.NewPeerHandler(s.Cluster, etcdserver.RaftTimer(s), s.RaftHandler())
// Start the peer server in a goroutine // Start the peer server in a goroutine
for _, l := range plns { for _, l := range plns {
go func(l net.Listener) { go func(l net.Listener) {

View File

@ -59,6 +59,12 @@ type Cluster struct {
id types.ID id types.ID
token string token string
store store.Store store store.Store
// index is the raft index that cluster is updated at bootstrap
// from remote cluster info.
// It may have a higher value than local raft index, because it
// displays a further view of the cluster.
// TODO: upgrade it as last modified index
index uint64
sync.Mutex // guards members and removed map sync.Mutex // guards members and removed map
members map[types.ID]*Member members map[types.ID]*Member
@ -230,6 +236,8 @@ func (c *Cluster) SetID(id types.ID) { c.id = id }
func (c *Cluster) SetStore(st store.Store) { c.store = st } func (c *Cluster) SetStore(st store.Store) { c.store = st }
func (c *Cluster) UpdateIndex(index uint64) { c.index = index }
func (c *Cluster) Recover() { func (c *Cluster) Recover() {
c.members, c.removed = membersFromStore(c.store) c.members, c.removed = membersFromStore(c.store)
} }

View File

@ -21,6 +21,7 @@ import (
"log" "log"
"net/http" "net/http"
"sort" "sort"
"strconv"
"time" "time"
"github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/pkg/types"
@ -88,7 +89,21 @@ func getClusterFromRemotePeers(urls []string, logerr bool, tr *http.Transport) (
} }
continue continue
} }
return NewClusterFromMembers("", id, membs), nil var index uint64
// The header at or before v2.0.3 doesn't have this field. For backward
// compatibility, it checks whether the field exists.
if indexStr := resp.Header.Get("X-Raft-Index"); indexStr != "" {
index, err = strconv.ParseUint(indexStr, 10, 64)
if err != nil {
if logerr {
log.Printf("etcdserver: could not parse raft index: %v", err)
}
continue
}
}
cl := NewClusterFromMembers("", id, membs)
cl.UpdateIndex(index)
return cl, nil
} }
return nil, fmt.Errorf("etcdserver: could not retrieve cluster information from the given urls") return nil, fmt.Errorf("etcdserver: could not retrieve cluster information from the given urls")
} }

View File

@ -18,6 +18,7 @@ import (
"encoding/json" "encoding/json"
"log" "log"
"net/http" "net/http"
"strconv"
"github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/rafthttp" "github.com/coreos/etcd/rafthttp"
@ -28,9 +29,10 @@ const (
) )
// NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests. // NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests.
func NewPeerHandler(clusterInfo etcdserver.ClusterInfo, raftHandler http.Handler) http.Handler { func NewPeerHandler(clusterInfo etcdserver.ClusterInfo, timer etcdserver.RaftTimer, raftHandler http.Handler) http.Handler {
mh := &peerMembersHandler{ mh := &peerMembersHandler{
clusterInfo: clusterInfo, clusterInfo: clusterInfo,
timer: timer,
} }
mux := http.NewServeMux() mux := http.NewServeMux()
@ -43,6 +45,7 @@ func NewPeerHandler(clusterInfo etcdserver.ClusterInfo, raftHandler http.Handler
type peerMembersHandler struct { type peerMembersHandler struct {
clusterInfo etcdserver.ClusterInfo clusterInfo etcdserver.ClusterInfo
timer etcdserver.RaftTimer
} }
func (h *peerMembersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (h *peerMembersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@ -50,6 +53,7 @@ func (h *peerMembersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return return
} }
w.Header().Set("X-Etcd-Cluster-ID", h.clusterInfo.ID().String()) w.Header().Set("X-Etcd-Cluster-ID", h.clusterInfo.ID().String())
w.Header().Set("X-Raft-Index", strconv.FormatUint(h.timer.Index(), 10))
if r.URL.Path != peerMembersPrefix { if r.URL.Path != peerMembersPrefix {
http.Error(w, "bad path", http.StatusBadRequest) http.Error(w, "bad path", http.StatusBadRequest)

View File

@ -33,7 +33,7 @@ func TestNewPeerHandlerOnRaftPrefix(t *testing.T) {
h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("test data")) w.Write([]byte("test data"))
}) })
ph := NewPeerHandler(&fakeCluster{}, h) ph := NewPeerHandler(&fakeCluster{}, &dummyRaftTimer{}, h)
srv := httptest.NewServer(ph) srv := httptest.NewServer(ph)
defer srv.Close() defer srv.Close()
@ -91,7 +91,7 @@ func TestServeMembersGet(t *testing.T) {
id: 1, id: 1,
members: map[uint64]*etcdserver.Member{1: &memb1, 2: &memb2}, members: map[uint64]*etcdserver.Member{1: &memb1, 2: &memb2},
} }
h := &peerMembersHandler{clusterInfo: cluster} h := &peerMembersHandler{clusterInfo: cluster, timer: &dummyRaftTimer{}}
msb, err := json.Marshal([]etcdserver.Member{memb1, memb2}) msb, err := json.Marshal([]etcdserver.Member{memb1, memb2})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)

View File

@ -164,6 +164,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
if err := ValidateClusterAndAssignIDs(cfg.Cluster, existingCluster); err != nil { if err := ValidateClusterAndAssignIDs(cfg.Cluster, existingCluster); err != nil {
return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err) return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err)
} }
cfg.Cluster.UpdateIndex(existingCluster.index)
cfg.Cluster.SetID(existingCluster.id) cfg.Cluster.SetID(existingCluster.id)
cfg.Cluster.SetStore(st) cfg.Cluster.SetStore(st)
cfg.Print() cfg.Print()
@ -393,8 +394,11 @@ func (s *EtcdServer) run() {
if err := s.store.Recovery(rd.Snapshot.Data); err != nil { if err := s.store.Recovery(rd.Snapshot.Data); err != nil {
log.Panicf("recovery store error: %v", err) log.Panicf("recovery store error: %v", err)
} }
s.Cluster.Recover()
// It avoids snapshot recovery overwriting newer cluster and
// transport setting, which may block the communication.
if s.Cluster.index < rd.Snapshot.Metadata.Index {
s.Cluster.Recover()
// recover raft transport // recover raft transport
s.r.transport.RemoveAllPeers() s.r.transport.RemoveAllPeers()
for _, m := range s.Cluster.Members() { for _, m := range s.Cluster.Members() {
@ -403,6 +407,7 @@ func (s *EtcdServer) run() {
} }
s.r.transport.AddPeer(m.ID, m.PeerURLs) s.r.transport.AddPeer(m.ID, m.PeerURLs)
} }
}
appliedi = rd.Snapshot.Metadata.Index appliedi = rd.Snapshot.Metadata.Index
confState = rd.Snapshot.Metadata.ConfState confState = rd.Snapshot.Metadata.ConfState

View File

@ -526,7 +526,7 @@ func (m *member) Launch() error {
m.s.SyncTicker = time.Tick(500 * time.Millisecond) m.s.SyncTicker = time.Tick(500 * time.Millisecond)
m.s.Start() m.s.Start()
m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s.Cluster, m.s.RaftHandler())} m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s.Cluster, m.s, m.s.RaftHandler())}
for _, ln := range m.PeerListeners { for _, ln := range m.PeerListeners {
hs := &httptest.Server{ hs := &httptest.Server{