Merge pull request #1340 from barakmich/better_ids2

etcdserver: Calculate IDs based on PeerURLs and --initial-cluster-name
This commit is contained in:
Barak Michener 2014-10-22 14:49:49 -04:00
commit 13656eb4e7
13 changed files with 132 additions and 84 deletions

View File

@ -167,3 +167,9 @@ Etcd can also do internal server-to-server communication using SSL client certs.
To do this just change the `-*-file` flags to `-peer-*-file`.
If you are using SSL for server-to-server communication, you must use it on all instances of etcd.
### Bootstrapping a new cluster by name
An etcd server is uniquely defined by the peer addresses it listens to. Suppose, however, that you wish to start over, while maintaining the data from the previous cluster -- that is, to pretend that this machine has never joined a cluster before.
You can use `--initial-cluster-name` to generate a new unique ID for each node, as a shared token that every node understands. Nodes also take this into account for bootstrapping the new cluster ID, so it also provides a way for a machine to listen on the same interfaces, disconnect from one cluster, and join a different cluster.

View File

@ -32,15 +32,12 @@ import (
// Cluster is a list of Members that belong to the same raft cluster
type Cluster struct {
id uint64
name string
members map[uint64]*Member
}
func NewCluster() *Cluster {
return &Cluster{members: make(map[uint64]*Member)}
}
func (c Cluster) FindID(id uint64) *Member {
return c.members[id]
func NewCluster(clusterName string) *Cluster {
return &Cluster{name: clusterName, members: make(map[uint64]*Member)}
}
func (c Cluster) FindName(name string) *Member {
@ -49,10 +46,13 @@ func (c Cluster) FindName(name string) *Member {
return m
}
}
return nil
}
func (c Cluster) FindID(id uint64) *Member {
return c.members[id]
}
func (c Cluster) Add(m Member) error {
if c.FindID(m.ID) != nil {
return fmt.Errorf("Member exists with identical ID %v", m)
@ -86,10 +86,10 @@ func (c Cluster) Pick(id uint64) string {
return ""
}
// Set parses command line sets of names to IPs formatted like:
// SetMembersFromString parses a sets of names to IPs either from the command line or discovery formatted like:
// mach0=http://1.1.1.1,mach0=http://2.2.2.2,mach0=http://1.1.1.1,mach1=http://2.2.2.2,mach1=http://3.3.3.3
func (c *Cluster) Set(s string) error {
*c = *NewCluster()
func (c *Cluster) SetMembersFromString(s string) error {
c.members = make(map[uint64]*Member)
v, err := url.ParseQuery(strings.Replace(s, ",", "&", -1))
if err != nil {
return err
@ -100,7 +100,7 @@ func (c *Cluster) Set(s string) error {
return fmt.Errorf("Empty URL given for %q", name)
}
m := newMember(name, types.URLs(*flags.NewURLsValue(strings.Join(urls, ","))), nil)
m := newMember(name, types.URLs(*flags.NewURLsValue(strings.Join(urls, ","))), c.name, nil)
err := c.Add(*m)
if err != nil {
return err
@ -109,13 +109,23 @@ func (c *Cluster) Set(s string) error {
return nil
}
func (c *Cluster) AddMemberFromURLs(name string, urls types.URLs) (*Member, error) {
m := newMember(name, urls, c.name, nil)
err := c.Add(*m)
if err != nil {
return nil, err
}
return m, nil
}
func (c *Cluster) GenID(salt []byte) {
mIDs := c.MemberIDs()
b := make([]byte, 8*len(mIDs))
for i, id := range mIDs {
binary.BigEndian.PutUint64(b[8*i:], id)
}
hash := sha1.Sum(append(b, salt...))
b = append(b, salt...)
hash := sha1.Sum(b)
c.id = binary.BigEndian.Uint64(hash[:8])
}

View File

@ -50,7 +50,8 @@ type clusterStore struct {
Store store.Store
// TODO: write the id into the actual store?
// TODO: save the id as string?
id uint64
id uint64
clusterName string
}
// Add puts a new Member into the store.
@ -76,7 +77,7 @@ func (s *clusterStore) Add(m Member) {
// TODO(philips): keep the latest copy without going to the store to avoid the
// lock here.
func (s *clusterStore) Get() Cluster {
c := NewCluster()
c := NewCluster(s.clusterName)
c.id = s.id
e, err := s.Store.Get(storeMembersPrefix, true, true)
if err != nil {

View File

@ -93,7 +93,7 @@ func TestClusterStoreGet(t *testing.T) {
},
}
for i, tt := range tests {
c := NewCluster()
c := NewCluster("")
if err := c.AddSlice(tt.mems); err != nil {
t.Fatal(err)
}

View File

@ -28,7 +28,7 @@ func TestClusterAddSlice(t *testing.T) {
}{
{
[]Member{},
NewCluster(),
NewCluster(""),
},
{
[]Member{
@ -44,7 +44,7 @@ func TestClusterAddSlice(t *testing.T) {
},
}
for i, tt := range tests {
c := NewCluster()
c := NewCluster("")
if err := c.AddSlice(tt.mems); err != nil {
t.Errorf("#%d: err=%#v, want nil", i, err)
continue
@ -134,10 +134,10 @@ func TestClusterFind(t *testing.T) {
},
}
for i, tt := range tests {
c := NewCluster()
c := NewCluster("")
c.AddSlice(tt.mems)
m := c.FindName(tt.name)
m := c.FindID(tt.id)
if m == nil && !tt.match {
continue
}
@ -150,7 +150,7 @@ func TestClusterFind(t *testing.T) {
}
for i, tt := range tests {
c := NewCluster()
c := NewCluster("")
c.AddSlice(tt.mems)
m := c.FindID(tt.id)
@ -181,13 +181,13 @@ func TestClusterSet(t *testing.T) {
},
}
for i, tt := range tests {
c := NewCluster()
c := NewCluster("")
if err := c.AddSlice(tt.mems); err != nil {
t.Error(err)
}
g := Cluster{}
g.Set(tt.f)
g.SetMembersFromString(tt.f)
if g.String() != c.String() {
t.Errorf("#%d: set = %v, want %v", i, g, c)
@ -196,7 +196,7 @@ func TestClusterSet(t *testing.T) {
}
func TestClusterGenID(t *testing.T) {
cs := NewCluster()
cs := NewCluster("")
cs.AddSlice([]Member{
newTestMember(1, nil, "", nil),
newTestMember(2, nil, "", nil),
@ -232,15 +232,15 @@ func TestClusterSetBad(t *testing.T) {
// "06b2f82fd81b2c20=http://128.193.4.20:2379,02c60cb75083ceef=http://128.193.4.20:2379",
}
for i, tt := range tests {
g := NewCluster()
if err := g.Set(tt); err == nil {
g := NewCluster("")
if err := g.SetMembersFromString(tt); err == nil {
t.Errorf("#%d: set = %v, want err", i, tt)
}
}
}
func TestClusterMemberIDs(t *testing.T) {
cs := NewCluster()
cs := NewCluster("")
cs.AddSlice([]Member{
newTestMember(1, nil, "", nil),
newTestMember(4, nil, "", nil),
@ -259,7 +259,7 @@ func TestClusterAddBad(t *testing.T) {
newTestMember(1, nil, "mem1", nil),
newTestMember(1, nil, "mem2", nil),
}
c := NewCluster()
c := NewCluster("")
c.Add(newTestMember(1, nil, "mem1", nil))
for i, m := range mems {
if err := c.Add(m); err == nil {
@ -315,7 +315,7 @@ func TestClusterPeerURLs(t *testing.T) {
}
for i, tt := range tests {
c := NewCluster()
c := NewCluster("")
if err := c.AddSlice(tt.mems); err != nil {
t.Errorf("AddSlice error: %v", err)
continue
@ -374,7 +374,7 @@ func TestClusterClientURLs(t *testing.T) {
}
for i, tt := range tests {
c := NewCluster()
c := NewCluster("")
if err := c.AddSlice(tt.mems); err != nil {
t.Errorf("AddSlice error: %v", err)
continue

View File

@ -27,6 +27,7 @@ import (
// ServerConfig holds the configuration of etcd as taken from the command line or discovery.
type ServerConfig struct {
NodeID uint64
Name string
DiscoveryURL string
ClientURLs types.URLs
@ -40,17 +41,18 @@ type ServerConfig struct {
// VerifyBootstrapConfig sanity-checks the initial config and returns an error
// for things that should never happen.
func (c *ServerConfig) VerifyBootstrapConfig() error {
if c.NodeID == raft.None {
return fmt.Errorf("could not use %x as member id", raft.None)
}
if c.DiscoveryURL == "" && c.ClusterState != ClusterStateValueNew {
return fmt.Errorf("initial cluster state unset and no wal or discovery URL found")
}
// Make sure the cluster at least contains the local server.
m := c.Cluster.FindName(c.Name)
m := c.Cluster.FindID(c.NodeID)
if m == nil {
return fmt.Errorf("could not find name %v in cluster", c.Name)
}
if m.ID == raft.None {
return fmt.Errorf("could not use %x as member id", raft.None)
return fmt.Errorf("couldn't find local ID in cluster config")
}
// No identical IPs in the cluster peer list
@ -58,7 +60,7 @@ func (c *ServerConfig) VerifyBootstrapConfig() error {
for _, m := range c.Cluster.Members() {
for _, url := range m.PeerURLs {
if urlMap[url] {
return fmt.Errorf("duplicate url %v in server config", url)
return fmt.Errorf("duplicate url %v in cluster config", url)
}
urlMap[url] = true
}
@ -70,8 +72,6 @@ func (c *ServerConfig) WALDir() string { return path.Join(c.DataDir, "wal") }
func (c *ServerConfig) SnapDir() string { return path.Join(c.DataDir, "snap") }
func (c *ServerConfig) ID() uint64 { return c.Cluster.FindName(c.Name).ID }
func (c *ServerConfig) ShouldDiscover() bool {
return c.DiscoveryURL != ""
}

View File

@ -45,15 +45,20 @@ func TestBootstrapConfigVerify(t *testing.T) {
for i, tt := range tests {
cluster := &Cluster{}
cluster.Set(tt.clusterSetting)
err := cluster.SetMembersFromString(tt.clusterSetting)
if err != nil && tt.shouldError {
continue
}
cfg := ServerConfig{
Name: "node1",
NodeID: 0x7350a9cd4dc16f76,
DiscoveryURL: tt.disc,
Cluster: cluster,
ClusterState: tt.clst,
}
err := cfg.VerifyBootstrapConfig()
err = cfg.VerifyBootstrapConfig()
if (err == nil) && tt.shouldError {
t.Errorf("%#v", *cluster)
t.Errorf("#%d: Got no error where one was expected", i)
}
if (err != nil) && !tt.shouldError {

View File

@ -1661,7 +1661,7 @@ type fakeCluster struct {
func (c *fakeCluster) Add(m etcdserver.Member) { return }
func (c *fakeCluster) Get() etcdserver.Cluster {
cl := etcdserver.NewCluster()
cl := etcdserver.NewCluster("")
cl.AddSlice(c.members)
return *cl
}

View File

@ -22,6 +22,7 @@ import (
"fmt"
"log"
"path"
"sort"
"strconv"
"time"
@ -48,17 +49,19 @@ type Member struct {
// newMember creates a Member without an ID and generates one based on the
// name, peer URLs. This is used for bootstrapping.
func newMember(name string, peerURLs types.URLs, now *time.Time) *Member {
func newMember(name string, peerURLs types.URLs, clusterName string, now *time.Time) *Member {
m := &Member{
RaftAttributes: RaftAttributes{PeerURLs: peerURLs.StringSlice()},
Attributes: Attributes{Name: name},
}
b := []byte(m.Name)
var b []byte
sort.Strings(m.PeerURLs)
for _, p := range m.PeerURLs {
b = append(b, []byte(p)...)
}
b = append(b, []byte(clusterName)...)
if now != nil {
b = append(b, []byte(fmt.Sprintf("%d", now.Unix()))...)
}

View File

@ -35,8 +35,17 @@ func TestMemberTime(t *testing.T) {
mem *Member
id uint64
}{
{newMember("mem1", []url.URL{{Scheme: "http", Host: "10.0.0.8:2379"}}, nil), 11240395089494390470},
{newMember("mem1", []url.URL{{Scheme: "http", Host: "10.0.0.1:2379"}}, timeParse("1984-12-23T15:04:05Z")), 5483967913615174889},
{newMember("mem1", []url.URL{{Scheme: "http", Host: "10.0.0.8:2379"}}, "", nil), 14544069596553697298},
// Same ID, different name (names shouldn't matter)
{newMember("memfoo", []url.URL{{Scheme: "http", Host: "10.0.0.8:2379"}}, "", nil), 14544069596553697298},
// Same ID, different Time
{newMember("mem1", []url.URL{{Scheme: "http", Host: "10.0.0.8:2379"}}, "", timeParse("1984-12-23T15:04:05Z")), 2448790162483548276},
// Different cluster name
{newMember("mcm1", []url.URL{{Scheme: "http", Host: "10.0.0.8:2379"}}, "etcd", timeParse("1984-12-23T15:04:05Z")), 6973882743191604649},
{newMember("mem1", []url.URL{{Scheme: "http", Host: "10.0.0.1:2379"}}, "", timeParse("1984-12-23T15:04:05Z")), 1466075294948436910},
// Order shouldn't matter
{newMember("mem1", []url.URL{{Scheme: "http", Host: "10.0.0.1:2379"}, {Scheme: "http", Host: "10.0.0.2:2379"}}, "", nil), 16552244735972308939},
{newMember("mem1", []url.URL{{Scheme: "http", Host: "10.0.0.2:2379"}, {Scheme: "http", Host: "10.0.0.1:2379"}}, "", nil), 16552244735972308939},
}
for i, tt := range tests {
if tt.mem.ID != tt.id {

View File

@ -182,7 +182,7 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
log.Fatalf("etcdserver: %v", err)
}
if cfg.ShouldDiscover() {
d, err := discovery.New(cfg.DiscoveryURL, cfg.ID(), cfg.Cluster.String())
d, err := discovery.New(cfg.DiscoveryURL, cfg.NodeID, cfg.Cluster.String())
if err != nil {
log.Fatalf("etcdserver: cannot init discovery %v", err)
}
@ -190,7 +190,7 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
if err != nil {
log.Fatalf("etcdserver: %v", err)
}
if err = cfg.Cluster.Set(s); err != nil {
if err = cfg.Cluster.SetMembersFromString(s); err != nil {
log.Fatalf("etcdserver: %v", err)
}
}
@ -216,9 +216,9 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
sstats := &stats.ServerStats{
Name: cfg.Name,
ID: idAsHex(cfg.ID()),
ID: idAsHex(cfg.NodeID),
}
lstats := stats.NewLeaderStats(idAsHex(cfg.ID()))
lstats := stats.NewLeaderStats(idAsHex(cfg.NodeID))
s := &EtcdServer{
store: st,
@ -648,7 +648,7 @@ func startNode(cfg *ServerConfig) (id, cid uint64, n raft.Node, w *wal.WAL) {
// TODO: remove the discoveryURL when it becomes part of the source for
// generating nodeID.
cfg.Cluster.GenID([]byte(cfg.DiscoveryURL))
metadata := pbutil.MustMarshal(&pb.Metadata{NodeID: cfg.ID(), ClusterID: cfg.Cluster.ID()})
metadata := pbutil.MustMarshal(&pb.Metadata{NodeID: cfg.NodeID, ClusterID: cfg.Cluster.ID()})
if w, err = wal.Create(cfg.WALDir(), metadata); err != nil {
log.Fatal(err)
}
@ -661,9 +661,9 @@ func startNode(cfg *ServerConfig) (id, cid uint64, n raft.Node, w *wal.WAL) {
}
peers[i] = raft.Peer{ID: id, Context: ctx}
}
id, cid = cfg.ID(), cfg.Cluster.ID()
id, cid = cfg.NodeID, cfg.Cluster.ID()
log.Printf("etcdserver: start node %d in cluster %d", id, cid)
n = raft.StartNode(cfg.ID(), peers, 10, 1)
n = raft.StartNode(cfg.NodeID, peers, 10, 1)
return
}

View File

@ -19,7 +19,10 @@ import (
"github.com/coreos/etcd/pkg/types"
)
const tickDuration = 5 * time.Millisecond
const (
tickDuration = 5 * time.Millisecond
clusterName = "etcd"
)
func init() {
// open microsecond-level time log for integration test debugging
@ -82,16 +85,18 @@ func (c *cluster) Launch(t *testing.T) {
}
lns := make([]net.Listener, c.Size)
bootstrapCfgs := make([]string, c.Size)
clusterCfg := etcdserver.NewCluster(clusterName)
for i := 0; i < c.Size; i++ {
l := newLocalListener(t)
// each member claims only one peer listener
lns[i] = l
bootstrapCfgs[i] = fmt.Sprintf("%s=%s", c.name(i), "http://"+l.Addr().String())
}
clusterCfg := &etcdserver.Cluster{}
if err := clusterCfg.Set(strings.Join(bootstrapCfgs, ",")); err != nil {
t.Fatal(err)
listenURLs, err := types.NewURLs([]string{"http://" + l.Addr().String()})
if err != nil {
t.Fatal(err)
}
if _, err = clusterCfg.AddMemberFromURLs(c.name(i), listenURLs); err != nil {
t.Fatal(err)
}
}
var err error
@ -100,6 +105,7 @@ func (c *cluster) Launch(t *testing.T) {
m.PeerListeners = []net.Listener{lns[i]}
cln := newLocalListener(t)
m.ClientListeners = []net.Listener{cln}
m.NodeID = clusterCfg.FindName(c.name(i)).ID
m.Name = c.name(i)
m.ClientURLs, err = types.NewURLs([]string{"http://" + cln.Addr().String()})
if err != nil {

58
main.go
View File

@ -47,8 +47,10 @@ var (
snapCount = fs.Uint64("snapshot-count", etcdserver.DefaultSnapCount, "Number of committed transactions to trigger a snapshot")
printVersion = fs.Bool("version", false, "Print the version and exit")
cluster = &etcdserver.Cluster{}
clusterState = new(etcdserver.ClusterState)
initialCluster = fs.String("initial-cluster", "default=http://localhost:2380,default=http://localhost:7001", "Initial cluster configuration for bootstrapping")
initialClusterName = fs.String("initial-cluster-name", "etcd", "Initial name for the etcd cluster during bootstrap")
cluster = &etcdserver.Cluster{}
clusterState = new(etcdserver.ClusterState)
cors = &pkg.CORSInfo{}
proxyFlag = new(flagtypes.Proxy)
@ -74,11 +76,6 @@ var (
)
func init() {
fs.Var(cluster, "initial-cluster", "Initial cluster configuration for bootstrapping")
if err := cluster.Set("default=http://localhost:2380,default=http://localhost:7001"); err != nil {
// Should never happen
log.Panic(err)
}
fs.Var(clusterState, "initial-cluster-state", "Initial cluster configuration for bootstrapping")
clusterState.Set(etcdserver.ClusterStateValueNew)
@ -131,9 +128,6 @@ func main() {
}
pkg.SetFlagsFromEnv(fs)
if err := setClusterForDiscovery(); err != nil {
log.Fatalf("etcd: %v", err)
}
if string(*proxyFlag) == flagtypes.ProxyValueOff {
startEtcd()
@ -147,6 +141,11 @@ func main() {
// startEtcd launches the etcd server and HTTP handlers for client/server communication.
func startEtcd() {
self, err := setupCluster()
if err != nil {
log.Fatalf("etcd: setupCluster returned error %v", err)
}
if *dir == "" {
*dir = fmt.Sprintf("%v_etcd_data", *name)
log.Printf("etcd: no data-dir provided, using default data-dir ./%s", *dir)
@ -165,6 +164,7 @@ func startEtcd() {
log.Fatal(err.Error())
}
cfg := &etcdserver.ServerConfig{
NodeID: self.ID,
Name: *name,
ClientURLs: acurls,
DataDir: *dir,
@ -262,28 +262,36 @@ func startProxy() {
}
}
// setClusterForDiscovery sets cluster to a temporary value if you are using
// the discovery.
func setClusterForDiscovery() error {
// setupCluster sets cluster to a temporary value if you are using
// discovery, or to the static configuration for bootstrapped clusters.
// Returns the local member on success.
func setupCluster() (*etcdserver.Member, error) {
cluster = etcdserver.NewCluster(*initialClusterName)
cluster.SetMembersFromString(*initialCluster)
set := make(map[string]bool)
flag.Visit(func(f *flag.Flag) {
set[f.Name] = true
})
if set["discovery"] && set["initial-cluster"] {
return fmt.Errorf("both discovery and initial-cluster are set")
return nil, fmt.Errorf("both discovery and bootstrap-config are set")
}
apurls, err := pkg.URLsFromFlags(fs, "advertise-peer-urls", "addr", peerTLSInfo)
if err != nil {
return nil, err
}
if set["discovery"] {
apurls, err := pkg.URLsFromFlags(fs, "advertise-peer-urls", "peer-addr", peerTLSInfo)
if err != nil {
return err
}
addrs := make([]string, len(apurls))
for i := range apurls {
addrs[i] = fmt.Sprintf("%s=%s", *name, apurls[i].String())
}
if err := cluster.Set(strings.Join(addrs, ",")); err != nil {
return err
cluster = etcdserver.NewCluster(*durl)
return cluster.AddMemberFromURLs(*name, apurls)
} else if set["initial-cluster"] {
// We're statically configured, and cluster has appropriately been set.
// Try to configure by indexing the static cluster by name.
for _, c := range cluster.Members() {
if c.Name == *name {
return c, nil
}
}
return nil, fmt.Errorf("cannot find the passed name %s in --initial-cluster bootstrap list.", *name)
}
return nil
return cluster.FindName(*name), nil
}