From 0a9c6164afac63af55269b5d9a13735f5de1a296 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Thu, 6 Nov 2014 21:50:35 -0800 Subject: [PATCH] etcdserver: add support for force cluster --- etcdmain/etcd.go | 34 ++++----- etcdserver/config.go | 19 +++--- etcdserver/force_cluster.go | 111 ++++++++++++++++++++++++++++++ etcdserver/force_cluster_test.go | 114 +++++++++++++++++++++++++++++++ etcdserver/server.go | 24 +++++-- 5 files changed, 272 insertions(+), 30 deletions(-) create mode 100644 etcdserver/force_cluster.go create mode 100644 etcdserver/force_cluster_test.go diff --git a/etcdmain/etcd.go b/etcdmain/etcd.go index 7b96750b3..4dcd064d0 100644 --- a/etcdmain/etcd.go +++ b/etcdmain/etcd.go @@ -53,13 +53,14 @@ const ( ) var ( - fs = flag.NewFlagSet("etcd", flag.ContinueOnError) - name = fs.String("name", "default", "Unique human-readable name for this node") - dir = fs.String("data-dir", "", "Path to the data directory") - durl = fs.String("discovery", "", "Discovery service used to bootstrap the cluster") - dproxy = fs.String("discovery-proxy", "", "HTTP proxy to use for traffic to discovery service") - snapCount = fs.Uint64("snapshot-count", etcdserver.DefaultSnapCount, "Number of committed transactions to trigger a snapshot") - printVersion = fs.Bool("version", false, "Print the version and exit") + fs = flag.NewFlagSet("etcd", flag.ContinueOnError) + name = fs.String("name", "default", "Unique human-readable name for this node") + dir = fs.String("data-dir", "", "Path to the data directory") + durl = fs.String("discovery", "", "Discovery service used to bootstrap the cluster") + dproxy = fs.String("discovery-proxy", "", "HTTP proxy to use for traffic to discovery service") + snapCount = fs.Uint64("snapshot-count", etcdserver.DefaultSnapCount, "Number of committed transactions to trigger a snapshot") + printVersion = fs.Bool("version", false, "Print the version and exit") + forceNewCluster = fs.Bool("force-new-cluster", false, "Force to create a new one member cluster") initialCluster = fs.String("initial-cluster", "default=http://localhost:2380,default=http://localhost:7001", "Initial cluster configuration for bootstrapping") initialClusterToken = fs.String("initial-cluster-token", "etcd-cluster", "Initial cluster token for the etcd cluster during bootstrap") @@ -262,15 +263,16 @@ func startEtcd() error { } cfg := &etcdserver.ServerConfig{ - Name: *name, - ClientURLs: acurls, - DataDir: *dir, - SnapCount: *snapCount, - Cluster: cls, - DiscoveryURL: *durl, - DiscoveryProxy: *dproxy, - NewCluster: clusterStateFlag.String() == clusterStateFlagNew, - Transport: pt, + Name: *name, + ClientURLs: acurls, + DataDir: *dir, + SnapCount: *snapCount, + Cluster: cls, + DiscoveryURL: *durl, + DiscoveryProxy: *dproxy, + NewCluster: clusterStateFlag.String() == clusterStateFlagNew, + Transport: pt, + ForceNewCluster: *forceNewCluster, } var s *etcdserver.EtcdServer s, err = etcdserver.NewServer(cfg) diff --git a/etcdserver/config.go b/etcdserver/config.go index 1098c0791..f99607a85 100644 --- a/etcdserver/config.go +++ b/etcdserver/config.go @@ -27,15 +27,16 @@ import ( // ServerConfig holds the configuration of etcd as taken from the command line or discovery. type ServerConfig struct { - Name string - DiscoveryURL string - DiscoveryProxy string - ClientURLs types.URLs - DataDir string - SnapCount uint64 - Cluster *Cluster - NewCluster bool - Transport *http.Transport + Name string + DiscoveryURL string + DiscoveryProxy string + ClientURLs types.URLs + DataDir string + SnapCount uint64 + Cluster *Cluster + NewCluster bool + Transport *http.Transport + ForceNewCluster bool } // VerifyBootstrapConfig sanity-checks the initial config and returns an error diff --git a/etcdserver/force_cluster.go b/etcdserver/force_cluster.go new file mode 100644 index 000000000..1b4e4719c --- /dev/null +++ b/etcdserver/force_cluster.go @@ -0,0 +1,111 @@ +/* + Copyright 2014 CoreOS, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package etcdserver + +import ( + "log" + + "github.com/coreos/etcd/pkg/pbutil" + "github.com/coreos/etcd/pkg/types" + "github.com/coreos/etcd/raft" + "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/wal" +) + +func restartAsStandaloneNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (id types.ID, n raft.Node, w *wal.WAL) { + var err error + if w, err = wal.OpenAtIndex(cfg.WALDir(), index); err != nil { + log.Fatalf("etcdserver: open wal error: %v", err) + } + id, cid, st, ents, err := readWAL(w, index) + if err != nil { + log.Fatalf("etcdserver: read wal error: %v", err) + } + cfg.Cluster.SetID(cid) + + // discard the previously uncommitted entries + if len(ents) != 0 { + ents = ents[:st.Commit+1] + } + + // force append the configuration change entries + toAppEnts := createConfigChangeEnts(getIDset(snapshot, ents), uint64(id), st.Term, st.Commit) + ents = append(ents, toAppEnts...) + + // force commit newly appended entries + for _, e := range toAppEnts { + err := w.SaveEntry(&e) + if err != nil { + log.Fatalf("etcdserver: %v", err) + } + } + if len(ents) != 0 { + st.Commit = ents[len(ents)-1].Index + } + + log.Printf("etcdserver: forcing restart of member %s in cluster %s at commit index %d", id, cfg.Cluster.ID(), st.Commit) + n = raft.RestartNode(uint64(id), 10, 1, snapshot, st, ents) + return +} + +func getIDset(snap *raftpb.Snapshot, ents []raftpb.Entry) map[uint64]bool { + ids := make(map[uint64]bool) + if snap != nil { + for _, id := range snap.Nodes { + ids[id] = true + } + } + for _, e := range ents { + if e.Type != raftpb.EntryConfChange { + continue + } + var cc raftpb.ConfChange + pbutil.MustUnmarshal(&cc, e.Data) + switch cc.Type { + case raftpb.ConfChangeAddNode: + ids[cc.NodeID] = true + case raftpb.ConfChangeRemoveNode: + delete(ids, cc.NodeID) + default: + log.Panicf("ConfChange Type should be either ConfChangeAddNode or ConfChangeRemoveNode!") + } + } + return ids +} + +func createConfigChangeEnts(ids map[uint64]bool, self uint64, term, index uint64) []raftpb.Entry { + ents := make([]raftpb.Entry, 0) + next := index + 1 + for id := range ids { + if id == self { + continue + } + cc := &raftpb.ConfChange{ + Type: raftpb.ConfChangeRemoveNode, + NodeID: id, + } + e := raftpb.Entry{ + Type: raftpb.EntryConfChange, + Data: pbutil.MustMarshal(cc), + Term: term, + Index: next, + } + ents = append(ents, e) + next++ + } + return ents +} diff --git a/etcdserver/force_cluster_test.go b/etcdserver/force_cluster_test.go new file mode 100644 index 000000000..7b5d7beb4 --- /dev/null +++ b/etcdserver/force_cluster_test.go @@ -0,0 +1,114 @@ +/* + Copyright 2014 CoreOS, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package etcdserver + +import ( + "reflect" + "testing" + + "github.com/coreos/etcd/pkg/pbutil" + "github.com/coreos/etcd/raft/raftpb" +) + +func TestGetIDset(t *testing.T) { + addcc := &raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 2} + addEntry := raftpb.Entry{Type: raftpb.EntryConfChange, Data: pbutil.MustMarshal(addcc)} + removecc := &raftpb.ConfChange{Type: raftpb.ConfChangeRemoveNode, NodeID: 2} + removeEntry := raftpb.Entry{Type: raftpb.EntryConfChange, Data: pbutil.MustMarshal(removecc)} + normalEntry := raftpb.Entry{Type: raftpb.EntryNormal} + + tests := []struct { + snap *raftpb.Snapshot + ents []raftpb.Entry + + widSet map[uint64]bool + }{ + {nil, []raftpb.Entry{}, map[uint64]bool{}}, + {&raftpb.Snapshot{Nodes: []uint64{1}}, []raftpb.Entry{}, map[uint64]bool{1: true}}, + {&raftpb.Snapshot{Nodes: []uint64{1}}, []raftpb.Entry{addEntry}, map[uint64]bool{1: true, 2: true}}, + {&raftpb.Snapshot{Nodes: []uint64{1}}, []raftpb.Entry{addEntry, removeEntry}, map[uint64]bool{1: true}}, + {&raftpb.Snapshot{Nodes: []uint64{1}}, []raftpb.Entry{addEntry, normalEntry}, map[uint64]bool{1: true, 2: true}}, + {&raftpb.Snapshot{Nodes: []uint64{1}}, []raftpb.Entry{addEntry, removeEntry, normalEntry}, map[uint64]bool{1: true}}, + } + + for i, tt := range tests { + idSet := getIDset(tt.snap, tt.ents) + if !reflect.DeepEqual(idSet, tt.widSet) { + t.Errorf("#%d: idset = %v, want %v", i, idSet, tt.widSet) + } + } +} + +func TestCreateConfigChangeEnts(t *testing.T) { + removecc2 := &raftpb.ConfChange{Type: raftpb.ConfChangeRemoveNode, NodeID: 2} + removecc3 := &raftpb.ConfChange{Type: raftpb.ConfChangeRemoveNode, NodeID: 3} + tests := []struct { + ids map[uint64]bool + self uint64 + term, index uint64 + + wents []raftpb.Entry + }{ + { + map[uint64]bool{1: true}, + 1, + 1, 1, + + []raftpb.Entry{}, + }, + { + map[uint64]bool{1: true, 2: true}, + 1, + 1, 1, + + []raftpb.Entry{{Term: 1, Index: 2, Type: raftpb.EntryConfChange, Data: pbutil.MustMarshal(removecc2)}}, + }, + { + map[uint64]bool{1: true, 2: true}, + 1, + 2, 2, + + []raftpb.Entry{{Term: 2, Index: 3, Type: raftpb.EntryConfChange, Data: pbutil.MustMarshal(removecc2)}}, + }, + { + map[uint64]bool{1: true, 2: true, 3: true}, + 1, + 2, 2, + + []raftpb.Entry{ + {Term: 2, Index: 3, Type: raftpb.EntryConfChange, Data: pbutil.MustMarshal(removecc2)}, + {Term: 2, Index: 4, Type: raftpb.EntryConfChange, Data: pbutil.MustMarshal(removecc3)}, + }, + }, + { + map[uint64]bool{2: true, 3: true}, + 2, + 2, 2, + + []raftpb.Entry{ + {Term: 2, Index: 3, Type: raftpb.EntryConfChange, Data: pbutil.MustMarshal(removecc3)}, + }, + }, + } + + for i, tt := range tests { + gents := createConfigChangeEnts(tt.ids, tt.self, tt.term, tt.index) + if !reflect.DeepEqual(gents, tt.wents) { + t.Errorf("#%d: ents = %v, want %v", i, gents, tt.wents) + } + } +} diff --git a/etcdserver/server.go b/etcdserver/server.go index 3f3356eb4..3d32c4737 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -230,7 +230,11 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { index = snapshot.Index } cfg.Cluster = NewClusterFromStore(cfg.Cluster.token, st) - id, n, w = restartNode(cfg, index, snapshot) + if !cfg.ForceNewCluster { + id, n, w = restartNode(cfg, index, snapshot) + } else { + id, n, w = restartAsStandaloneNode(cfg, index, snapshot) + } default: return nil, fmt.Errorf("unsupported bootstrap config") } @@ -736,17 +740,27 @@ func restartNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (id if w, err = wal.OpenAtIndex(cfg.WALDir(), index); err != nil { log.Fatalf("etcdserver: open wal error: %v", err) } - wmetadata, st, ents, err := w.ReadAll() + id, clusterID, st, ents, err := readWAL(w, index) if err != nil { log.Fatalf("etcdserver: read wal error: %v", err) } + cfg.Cluster.SetID(clusterID) + log.Printf("etcdserver: restart member %s in cluster %s at commit index %d", id, cfg.Cluster.ID(), st.Commit) + n = raft.RestartNode(uint64(id), 10, 1, snapshot, st, ents) + return +} + +func readWAL(w *wal.WAL, index uint64) (id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry, err error) { + var wmetadata []byte + wmetadata, st, ents, err = w.ReadAll() + if err != nil { + return + } var metadata pb.Metadata pbutil.MustUnmarshal(&metadata, wmetadata) id = types.ID(metadata.NodeID) - cfg.Cluster.SetID(types.ID(metadata.ClusterID)) - log.Printf("etcdserver: restart member %s in cluster %s at commit index %d", id, cfg.Cluster.ID(), st.Commit) - n = raft.RestartNode(uint64(id), 10, 1, snapshot, st, ents) + cid = types.ID(metadata.ClusterID) return }