From 08593bcdf6978464a56a2126590dd19ce8b3f429 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Thu, 23 Oct 2014 16:41:58 -0700 Subject: [PATCH] etcdserver: support newly-join member bootstrap --- etcdserver/cluster.go | 37 ++++++++++++++++++++ etcdserver/cluster_state.go | 4 ++- etcdserver/cluster_test.go | 70 +++++++++++++++++++++++++++++++++++++ etcdserver/member.go | 8 +++++ etcdserver/server.go | 51 ++++++++++++++++++++++++--- 5 files changed, 164 insertions(+), 6 deletions(-) diff --git a/etcdserver/cluster.go b/etcdserver/cluster.go index 6fa081c10..03996b868 100644 --- a/etcdserver/cluster.go +++ b/etcdserver/cluster.go @@ -24,6 +24,7 @@ import ( "log" "net/url" "path" + "reflect" "sort" "strings" @@ -118,6 +119,15 @@ func NewClusterFromStore(name string, st store.Store) *Cluster { return c } +func NewClusterFromMembers(name string, id uint64, membs []*Member) *Cluster { + c := newCluster(name) + c.id = id + for _, m := range membs { + c.members[m.ID] = m + } + return c +} + func newCluster(name string) *Cluster { return &Cluster{ name: name, @@ -214,6 +224,33 @@ func (c Cluster) String() string { return strings.Join(sl, ",") } +// ValidateAndAssignIDs validates the given members by matching their PeerURLs +// with the existing members in the cluster. If the validation succeeds, it +// assigns the IDs from the given members to the existing members in the +// cluster. If the validation fails, an error will be returned. +func (c *Cluster) ValidateAndAssignIDs(membs []*Member) error { + if len(c.members) != len(membs) { + return fmt.Errorf("cannot update %v from %v because the member count is unequal", c.members, membs) + } + omembs := make([]*Member, 0) + for _, m := range c.members { + omembs = append(omembs, m) + } + sort.Sort(SortableMemberSliceByPeerURLs(omembs)) + sort.Sort(SortableMemberSliceByPeerURLs(membs)) + for i := range omembs { + if !reflect.DeepEqual(omembs[i].PeerURLs, membs[i].PeerURLs) { + return fmt.Errorf("unmatched member while checking PeerURLs") + } + omembs[i].ID = membs[i].ID + } + c.members = make(map[uint64]*Member) + for _, m := range omembs { + c.members[m.ID] = m + } + return nil +} + func (c *Cluster) genID() { mIDs := c.MemberIDs() b := make([]byte, 8*len(mIDs)) diff --git a/etcdserver/cluster_state.go b/etcdserver/cluster_state.go index 3ff903e7e..c25cea6ff 100644 --- a/etcdserver/cluster_state.go +++ b/etcdserver/cluster_state.go @@ -21,12 +21,14 @@ import ( ) const ( - ClusterStateValueNew = "new" + ClusterStateValueNew = "new" + ClusterStateValueExisting = "existing" ) var ( ClusterStateValues = []string{ ClusterStateValueNew, + ClusterStateValueExisting, } ) diff --git a/etcdserver/cluster_test.go b/etcdserver/cluster_test.go index d3dc68bb8..12926ef22 100644 --- a/etcdserver/cluster_test.go +++ b/etcdserver/cluster_test.go @@ -282,6 +282,76 @@ func TestClusterClientURLs(t *testing.T) { } } +func TestClusterValidateAndAssignIDsBad(t *testing.T) { + tests := []struct { + clmembs []Member + membs []*Member + }{ + { + // unmatched length + []Member{ + newTestMember(1, []string{"http://127.0.0.1:2379"}, "", nil), + }, + []*Member{}, + }, + { + // unmatched peer urls + []Member{ + newTestMember(1, []string{"http://127.0.0.1:2379"}, "", nil), + }, + []*Member{ + newTestMemberp(1, []string{"http://127.0.0.1:4001"}, "", nil), + }, + }, + { + // unmatched peer urls + []Member{ + newTestMember(1, []string{"http://127.0.0.1:2379"}, "", nil), + newTestMember(2, []string{"http://127.0.0.2:2379"}, "", nil), + }, + []*Member{ + newTestMemberp(1, []string{"http://127.0.0.1:2379"}, "", nil), + newTestMemberp(2, []string{"http://127.0.0.2:4001"}, "", nil), + }, + }, + } + for i, tt := range tests { + cl := newTestCluster(tt.clmembs) + if err := cl.ValidateAndAssignIDs(tt.membs); err == nil { + t.Errorf("#%d: unexpected update success", i) + } + } +} + +func TestClusterValidateAndAssignIDs(t *testing.T) { + tests := []struct { + clmembs []Member + membs []*Member + wids []uint64 + }{ + { + []Member{ + newTestMember(1, []string{"http://127.0.0.1:2379"}, "", nil), + newTestMember(2, []string{"http://127.0.0.2:2379"}, "", nil), + }, + []*Member{ + newTestMemberp(3, []string{"http://127.0.0.1:2379"}, "", nil), + newTestMemberp(4, []string{"http://127.0.0.2:2379"}, "", nil), + }, + []uint64{3, 4}, + }, + } + for i, tt := range tests { + cl := newTestCluster(tt.clmembs) + if err := cl.ValidateAndAssignIDs(tt.membs); err != nil { + t.Errorf("#%d: unexpect update error: %v", i, err) + } + if !reflect.DeepEqual(cl.MemberIDs(), tt.wids) { + t.Errorf("#%d: ids = %v, want %v", i, cl.MemberIDs(), tt.wids) + } + } +} + func TestClusterGenID(t *testing.T) { cs := newTestCluster([]Member{ newTestMember(1, nil, "", nil), diff --git a/etcdserver/member.go b/etcdserver/member.go index 51e03a65e..28da0becb 100644 --- a/etcdserver/member.go +++ b/etcdserver/member.go @@ -96,3 +96,11 @@ func parseMemberID(key string) uint64 { func removedMemberStoreKey(id uint64) string { return path.Join(storeRemovedMembersPrefix, idAsHex(id)) } + +type SortableMemberSliceByPeerURLs []*Member + +func (p SortableMemberSliceByPeerURLs) Len() int { return len(p) } +func (p SortableMemberSliceByPeerURLs) Less(i, j int) bool { + return p[i].PeerURLs[0] < p[j].PeerURLs[0] +} +func (p SortableMemberSliceByPeerURLs) Swap(i, j int) { p[i], p[j] = p[j], p[i] } diff --git a/etcdserver/server.go b/etcdserver/server.go index b11f8df9b..d8266391f 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -19,8 +19,10 @@ package etcdserver import ( "encoding/json" "errors" + "io/ioutil" "log" "math/rand" + "net/http" "os" "path" "strconv" @@ -176,7 +178,17 @@ func NewServer(cfg *ServerConfig) *EtcdServer { var w *wal.WAL var n raft.Node var id uint64 - if !wal.Exist(cfg.WALDir()) { + haveWAL := wal.Exist(cfg.WALDir()) + switch { + case !haveWAL && cfg.ClusterState == ClusterStateValueExisting: + cl := getClusterFromPeers(cfg.Cluster.PeerURLs()) + if err := cfg.Cluster.ValidateAndAssignIDs(cl.Members()); err != nil { + log.Fatalf("etcdserver: %v", err) + } + cfg.Cluster.SetID(cl.id) + cfg.Cluster.SetStore(st) + id, n, w = startNode(cfg, nil) + case !haveWAL && cfg.ClusterState == ClusterStateValueNew: if err := cfg.VerifyBootstrapConfig(); err != nil { log.Fatalf("etcdserver: %v", err) } @@ -195,8 +207,8 @@ func NewServer(cfg *ServerConfig) *EtcdServer { } } cfg.Cluster.SetStore(st) - id, n, w = startNode(cfg) - } else { + id, n, w = startNode(cfg, cfg.Cluster.MemberIDs()) + case haveWAL: if cfg.ShouldDiscover() { log.Printf("etcdserver: warn: ignoring discovery: etcd has already been initialized and has a valid log in %q", cfg.WALDir()) } @@ -212,6 +224,8 @@ func NewServer(cfg *ServerConfig) *EtcdServer { } cfg.Cluster = NewClusterFromStore(cfg.Cluster.name, st) id, n, w = restartNode(cfg, index, snapshot) + default: + log.Fatalf("etcdserver: unsupported bootstrap config") } sstats := &stats.ServerStats{ @@ -642,7 +656,35 @@ func (s *EtcdServer) snapshot(snapi uint64, snapnodes []uint64) { s.storage.Cut() } -func startNode(cfg *ServerConfig) (id uint64, n raft.Node, w *wal.WAL) { +func getClusterFromPeers(urls []string) *Cluster { + for _, u := range urls { + resp, err := http.Get(u + "/members") + if err != nil { + log.Printf("etcdserver: get /members on %s: %v", u, err) + continue + } + b, err := ioutil.ReadAll(resp.Body) + if err != nil { + log.Printf("etcdserver: read body error: %v", err) + continue + } + var membs []*Member + if err := json.Unmarshal(b, &membs); err != nil { + log.Printf("etcdserver: unmarshal body error: %v", err) + continue + } + id, err := strconv.ParseUint(resp.Header.Get("X-Etcd-Cluster-ID"), 16, 64) + if err != nil { + log.Printf("etcdserver: parse uint error: %v", err) + continue + } + return NewClusterFromMembers("", id, membs) + } + log.Fatalf("etcdserver: could not retrieve cluster information from the given urls") + return nil +} + +func startNode(cfg *ServerConfig, ids []uint64) (id uint64, n raft.Node, w *wal.WAL) { var err error // TODO: remove the discoveryURL when it becomes part of the source for // generating nodeID. @@ -651,7 +693,6 @@ func startNode(cfg *ServerConfig) (id uint64, n raft.Node, w *wal.WAL) { if w, err = wal.Create(cfg.WALDir(), metadata); err != nil { log.Fatal(err) } - ids := cfg.Cluster.MemberIDs() peers := make([]raft.Peer, len(ids)) for i, id := range ids { ctx, err := json.Marshal((*cfg.Cluster).Member(id))