etcdserver: add support for force cluster

This commit is contained in:
Xiang Li 2014-11-06 21:50:35 -08:00
parent 376268391b
commit 0a9c6164af
5 changed files with 272 additions and 30 deletions

View File

@ -53,13 +53,14 @@ const (
)
var (
fs = flag.NewFlagSet("etcd", flag.ContinueOnError)
name = fs.String("name", "default", "Unique human-readable name for this node")
dir = fs.String("data-dir", "", "Path to the data directory")
durl = fs.String("discovery", "", "Discovery service used to bootstrap the cluster")
dproxy = fs.String("discovery-proxy", "", "HTTP proxy to use for traffic to discovery service")
snapCount = fs.Uint64("snapshot-count", etcdserver.DefaultSnapCount, "Number of committed transactions to trigger a snapshot")
printVersion = fs.Bool("version", false, "Print the version and exit")
fs = flag.NewFlagSet("etcd", flag.ContinueOnError)
name = fs.String("name", "default", "Unique human-readable name for this node")
dir = fs.String("data-dir", "", "Path to the data directory")
durl = fs.String("discovery", "", "Discovery service used to bootstrap the cluster")
dproxy = fs.String("discovery-proxy", "", "HTTP proxy to use for traffic to discovery service")
snapCount = fs.Uint64("snapshot-count", etcdserver.DefaultSnapCount, "Number of committed transactions to trigger a snapshot")
printVersion = fs.Bool("version", false, "Print the version and exit")
forceNewCluster = fs.Bool("force-new-cluster", false, "Force to create a new one member cluster")
initialCluster = fs.String("initial-cluster", "default=http://localhost:2380,default=http://localhost:7001", "Initial cluster configuration for bootstrapping")
initialClusterToken = fs.String("initial-cluster-token", "etcd-cluster", "Initial cluster token for the etcd cluster during bootstrap")
@ -262,15 +263,16 @@ func startEtcd() error {
}
cfg := &etcdserver.ServerConfig{
Name: *name,
ClientURLs: acurls,
DataDir: *dir,
SnapCount: *snapCount,
Cluster: cls,
DiscoveryURL: *durl,
DiscoveryProxy: *dproxy,
NewCluster: clusterStateFlag.String() == clusterStateFlagNew,
Transport: pt,
Name: *name,
ClientURLs: acurls,
DataDir: *dir,
SnapCount: *snapCount,
Cluster: cls,
DiscoveryURL: *durl,
DiscoveryProxy: *dproxy,
NewCluster: clusterStateFlag.String() == clusterStateFlagNew,
Transport: pt,
ForceNewCluster: *forceNewCluster,
}
var s *etcdserver.EtcdServer
s, err = etcdserver.NewServer(cfg)

View File

@ -27,15 +27,16 @@ import (
// ServerConfig holds the configuration of etcd as taken from the command line or discovery.
type ServerConfig struct {
Name string
DiscoveryURL string
DiscoveryProxy string
ClientURLs types.URLs
DataDir string
SnapCount uint64
Cluster *Cluster
NewCluster bool
Transport *http.Transport
Name string
DiscoveryURL string
DiscoveryProxy string
ClientURLs types.URLs
DataDir string
SnapCount uint64
Cluster *Cluster
NewCluster bool
Transport *http.Transport
ForceNewCluster bool
}
// VerifyBootstrapConfig sanity-checks the initial config and returns an error

111
etcdserver/force_cluster.go Normal file
View File

@ -0,0 +1,111 @@
/*
Copyright 2014 CoreOS, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcdserver
import (
"log"
"github.com/coreos/etcd/pkg/pbutil"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/wal"
)
func restartAsStandaloneNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (id types.ID, n raft.Node, w *wal.WAL) {
var err error
if w, err = wal.OpenAtIndex(cfg.WALDir(), index); err != nil {
log.Fatalf("etcdserver: open wal error: %v", err)
}
id, cid, st, ents, err := readWAL(w, index)
if err != nil {
log.Fatalf("etcdserver: read wal error: %v", err)
}
cfg.Cluster.SetID(cid)
// discard the previously uncommitted entries
if len(ents) != 0 {
ents = ents[:st.Commit+1]
}
// force append the configuration change entries
toAppEnts := createConfigChangeEnts(getIDset(snapshot, ents), uint64(id), st.Term, st.Commit)
ents = append(ents, toAppEnts...)
// force commit newly appended entries
for _, e := range toAppEnts {
err := w.SaveEntry(&e)
if err != nil {
log.Fatalf("etcdserver: %v", err)
}
}
if len(ents) != 0 {
st.Commit = ents[len(ents)-1].Index
}
log.Printf("etcdserver: forcing restart of member %s in cluster %s at commit index %d", id, cfg.Cluster.ID(), st.Commit)
n = raft.RestartNode(uint64(id), 10, 1, snapshot, st, ents)
return
}
func getIDset(snap *raftpb.Snapshot, ents []raftpb.Entry) map[uint64]bool {
ids := make(map[uint64]bool)
if snap != nil {
for _, id := range snap.Nodes {
ids[id] = true
}
}
for _, e := range ents {
if e.Type != raftpb.EntryConfChange {
continue
}
var cc raftpb.ConfChange
pbutil.MustUnmarshal(&cc, e.Data)
switch cc.Type {
case raftpb.ConfChangeAddNode:
ids[cc.NodeID] = true
case raftpb.ConfChangeRemoveNode:
delete(ids, cc.NodeID)
default:
log.Panicf("ConfChange Type should be either ConfChangeAddNode or ConfChangeRemoveNode!")
}
}
return ids
}
func createConfigChangeEnts(ids map[uint64]bool, self uint64, term, index uint64) []raftpb.Entry {
ents := make([]raftpb.Entry, 0)
next := index + 1
for id := range ids {
if id == self {
continue
}
cc := &raftpb.ConfChange{
Type: raftpb.ConfChangeRemoveNode,
NodeID: id,
}
e := raftpb.Entry{
Type: raftpb.EntryConfChange,
Data: pbutil.MustMarshal(cc),
Term: term,
Index: next,
}
ents = append(ents, e)
next++
}
return ents
}

View File

@ -0,0 +1,114 @@
/*
Copyright 2014 CoreOS, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcdserver
import (
"reflect"
"testing"
"github.com/coreos/etcd/pkg/pbutil"
"github.com/coreos/etcd/raft/raftpb"
)
func TestGetIDset(t *testing.T) {
addcc := &raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 2}
addEntry := raftpb.Entry{Type: raftpb.EntryConfChange, Data: pbutil.MustMarshal(addcc)}
removecc := &raftpb.ConfChange{Type: raftpb.ConfChangeRemoveNode, NodeID: 2}
removeEntry := raftpb.Entry{Type: raftpb.EntryConfChange, Data: pbutil.MustMarshal(removecc)}
normalEntry := raftpb.Entry{Type: raftpb.EntryNormal}
tests := []struct {
snap *raftpb.Snapshot
ents []raftpb.Entry
widSet map[uint64]bool
}{
{nil, []raftpb.Entry{}, map[uint64]bool{}},
{&raftpb.Snapshot{Nodes: []uint64{1}}, []raftpb.Entry{}, map[uint64]bool{1: true}},
{&raftpb.Snapshot{Nodes: []uint64{1}}, []raftpb.Entry{addEntry}, map[uint64]bool{1: true, 2: true}},
{&raftpb.Snapshot{Nodes: []uint64{1}}, []raftpb.Entry{addEntry, removeEntry}, map[uint64]bool{1: true}},
{&raftpb.Snapshot{Nodes: []uint64{1}}, []raftpb.Entry{addEntry, normalEntry}, map[uint64]bool{1: true, 2: true}},
{&raftpb.Snapshot{Nodes: []uint64{1}}, []raftpb.Entry{addEntry, removeEntry, normalEntry}, map[uint64]bool{1: true}},
}
for i, tt := range tests {
idSet := getIDset(tt.snap, tt.ents)
if !reflect.DeepEqual(idSet, tt.widSet) {
t.Errorf("#%d: idset = %v, want %v", i, idSet, tt.widSet)
}
}
}
func TestCreateConfigChangeEnts(t *testing.T) {
removecc2 := &raftpb.ConfChange{Type: raftpb.ConfChangeRemoveNode, NodeID: 2}
removecc3 := &raftpb.ConfChange{Type: raftpb.ConfChangeRemoveNode, NodeID: 3}
tests := []struct {
ids map[uint64]bool
self uint64
term, index uint64
wents []raftpb.Entry
}{
{
map[uint64]bool{1: true},
1,
1, 1,
[]raftpb.Entry{},
},
{
map[uint64]bool{1: true, 2: true},
1,
1, 1,
[]raftpb.Entry{{Term: 1, Index: 2, Type: raftpb.EntryConfChange, Data: pbutil.MustMarshal(removecc2)}},
},
{
map[uint64]bool{1: true, 2: true},
1,
2, 2,
[]raftpb.Entry{{Term: 2, Index: 3, Type: raftpb.EntryConfChange, Data: pbutil.MustMarshal(removecc2)}},
},
{
map[uint64]bool{1: true, 2: true, 3: true},
1,
2, 2,
[]raftpb.Entry{
{Term: 2, Index: 3, Type: raftpb.EntryConfChange, Data: pbutil.MustMarshal(removecc2)},
{Term: 2, Index: 4, Type: raftpb.EntryConfChange, Data: pbutil.MustMarshal(removecc3)},
},
},
{
map[uint64]bool{2: true, 3: true},
2,
2, 2,
[]raftpb.Entry{
{Term: 2, Index: 3, Type: raftpb.EntryConfChange, Data: pbutil.MustMarshal(removecc3)},
},
},
}
for i, tt := range tests {
gents := createConfigChangeEnts(tt.ids, tt.self, tt.term, tt.index)
if !reflect.DeepEqual(gents, tt.wents) {
t.Errorf("#%d: ents = %v, want %v", i, gents, tt.wents)
}
}
}

View File

@ -230,7 +230,11 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
index = snapshot.Index
}
cfg.Cluster = NewClusterFromStore(cfg.Cluster.token, st)
id, n, w = restartNode(cfg, index, snapshot)
if !cfg.ForceNewCluster {
id, n, w = restartNode(cfg, index, snapshot)
} else {
id, n, w = restartAsStandaloneNode(cfg, index, snapshot)
}
default:
return nil, fmt.Errorf("unsupported bootstrap config")
}
@ -736,17 +740,27 @@ func restartNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (id
if w, err = wal.OpenAtIndex(cfg.WALDir(), index); err != nil {
log.Fatalf("etcdserver: open wal error: %v", err)
}
wmetadata, st, ents, err := w.ReadAll()
id, clusterID, st, ents, err := readWAL(w, index)
if err != nil {
log.Fatalf("etcdserver: read wal error: %v", err)
}
cfg.Cluster.SetID(clusterID)
log.Printf("etcdserver: restart member %s in cluster %s at commit index %d", id, cfg.Cluster.ID(), st.Commit)
n = raft.RestartNode(uint64(id), 10, 1, snapshot, st, ents)
return
}
func readWAL(w *wal.WAL, index uint64) (id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry, err error) {
var wmetadata []byte
wmetadata, st, ents, err = w.ReadAll()
if err != nil {
return
}
var metadata pb.Metadata
pbutil.MustUnmarshal(&metadata, wmetadata)
id = types.ID(metadata.NodeID)
cfg.Cluster.SetID(types.ID(metadata.ClusterID))
log.Printf("etcdserver: restart member %s in cluster %s at commit index %d", id, cfg.Cluster.ID(), st.Commit)
n = raft.RestartNode(uint64(id), 10, 1, snapshot, st, ents)
cid = types.ID(metadata.ClusterID)
return
}