From 502a3c2460874b2b26fb34e7bd1b2d4b581d3f51 Mon Sep 17 00:00:00 2001 From: Barak Michener Date: Tue, 21 Oct 2014 18:02:57 -0400 Subject: [PATCH] Refactor Cluster to hold and add members. --- etcdserver/cluster.go | 32 +++++++++++++++++++++-------- etcdserver/cluster_test.go | 4 ++-- etcdserver/config_test.go | 2 +- etcdserver/member.go | 10 +++------ etcdserver/member_test.go | 15 +++++++------- etcdserver/server.go | 2 +- integration/cluster_test.go | 31 +++++++++++++--------------- main.go | 41 ++++++++++--------------------------- 8 files changed, 63 insertions(+), 74 deletions(-) diff --git a/etcdserver/cluster.go b/etcdserver/cluster.go index 717cefef8..a91f9106c 100644 --- a/etcdserver/cluster.go +++ b/etcdserver/cluster.go @@ -37,7 +37,17 @@ type Cluster struct { } func NewCluster(clusterName string) *Cluster { - return &Cluster{name: clusterName, members: make(map[uint64]*Member)} + c := &Cluster{name: clusterName, members: make(map[uint64]*Member)} + return c +} + +func (c Cluster) FindName(name string) *Member { + for _, m := range c.members { + if m.Name == name { + return m + } + } + return nil } func (c Cluster) FindID(id uint64) *Member { @@ -77,10 +87,10 @@ func (c Cluster) Pick(id uint64) string { return "" } -// Set parses command line sets of names to IPs formatted like: +// AddMembersFromFlag parses a sets of names to IPs either from the command line or discovery formatted like: // mach0=http://1.1.1.1,mach0=http://2.2.2.2,mach0=http://1.1.1.1,mach1=http://2.2.2.2,mach1=http://3.3.3.3 -func (c *Cluster) Set(s string) error { - *c = *NewCluster(c.name) +func (c *Cluster) SetMembersFromString(s string) error { + c.members = make(map[uint64]*Member) v, err := url.ParseQuery(strings.Replace(s, ",", "&", -1)) if err != nil { return err @@ -91,7 +101,7 @@ func (c *Cluster) Set(s string) error { return fmt.Errorf("Empty URL given for %q", name) } - m := newMember(name, types.URLs(*flags.NewURLsValue(strings.Join(urls, ","))), &c.name, nil) + m := newMember(name, types.URLs(*flags.NewURLsValue(strings.Join(urls, ","))), c.name, nil) err := c.Add(*m) if err != nil { return err @@ -100,15 +110,21 @@ func (c *Cluster) Set(s string) error { return nil } +func (c *Cluster) AddMemberFromURLs(name string, urls types.URLs) (*Member, error) { + m := newMember(name, urls, c.name, nil) + err := c.Add(*m) + if err != nil { + return nil, err + } + return m, nil +} + func (c *Cluster) GenID(salt []byte) { mIDs := c.MemberIDs() b := make([]byte, 8*len(mIDs)) for i, id := range mIDs { binary.BigEndian.PutUint64(b[8*i:], id) } - if len(c.name) > 0 { - b = append(b, []byte(c.name)...) - } b = append(b, salt...) hash := sha1.Sum(b) c.id = binary.BigEndian.Uint64(hash[:8]) diff --git a/etcdserver/cluster_test.go b/etcdserver/cluster_test.go index f7291fd54..0f36da058 100644 --- a/etcdserver/cluster_test.go +++ b/etcdserver/cluster_test.go @@ -187,7 +187,7 @@ func TestClusterSet(t *testing.T) { } g := Cluster{} - g.Set(tt.f) + g.SetMembersFromString(tt.f) if g.String() != c.String() { t.Errorf("#%d: set = %v, want %v", i, g, c) @@ -233,7 +233,7 @@ func TestClusterSetBad(t *testing.T) { } for i, tt := range tests { g := NewCluster("") - if err := g.Set(tt); err == nil { + if err := g.SetMembersFromString(tt); err == nil { t.Errorf("#%d: set = %v, want err", i, tt) } } diff --git a/etcdserver/config_test.go b/etcdserver/config_test.go index 1af9797f5..46b2d6835 100644 --- a/etcdserver/config_test.go +++ b/etcdserver/config_test.go @@ -45,7 +45,7 @@ func TestBootstrapConfigVerify(t *testing.T) { for i, tt := range tests { cluster := &Cluster{} - err := cluster.Set(tt.clusterSetting) + err := cluster.SetMembersFromString(tt.clusterSetting) if err != nil && tt.shouldError { continue } diff --git a/etcdserver/member.go b/etcdserver/member.go index cb1236708..8919b5504 100644 --- a/etcdserver/member.go +++ b/etcdserver/member.go @@ -49,7 +49,7 @@ type Member struct { // newMember creates a Member without an ID and generates one based on the // name, peer URLs. This is used for bootstrapping. -func newMember(name string, peerURLs types.URLs, clusterName *string, now *time.Time) *Member { +func newMember(name string, peerURLs types.URLs, clusterName string, now *time.Time) *Member { m := &Member{ RaftAttributes: RaftAttributes{PeerURLs: peerURLs.StringSlice()}, Attributes: Attributes{Name: name}, @@ -61,8 +61,8 @@ func newMember(name string, peerURLs types.URLs, clusterName *string, now *time. b = append(b, []byte(p)...) } - if clusterName != nil { - b = append(b, []byte(*clusterName)...) + if clusterName != "" { + b = append(b, []byte(clusterName)...) } if now != nil { @@ -74,10 +74,6 @@ func newMember(name string, peerURLs types.URLs, clusterName *string, now *time. return m } -func NewMemberFromURLs(name string, urls types.URLs, clusterName *string) *Member { - return newMember(name, urls, clusterName, nil) -} - func (m Member) storeKey() string { return path.Join(storeMembersPrefix, idAsHex(m.ID)) } diff --git a/etcdserver/member_test.go b/etcdserver/member_test.go index 1250ba372..f0024eda8 100644 --- a/etcdserver/member_test.go +++ b/etcdserver/member_test.go @@ -31,22 +31,21 @@ func timeParse(value string) *time.Time { } func TestMemberTime(t *testing.T) { - var clusterName = "etcd" tests := []struct { mem *Member id uint64 }{ - {newMember("mem1", []url.URL{{Scheme: "http", Host: "10.0.0.8:2379"}}, nil, nil), 14544069596553697298}, + {newMember("mem1", []url.URL{{Scheme: "http", Host: "10.0.0.8:2379"}}, "", nil), 14544069596553697298}, // Same ID, different name (names shouldn't matter) - {newMember("memfoo", []url.URL{{Scheme: "http", Host: "10.0.0.8:2379"}}, nil, nil), 14544069596553697298}, + {newMember("memfoo", []url.URL{{Scheme: "http", Host: "10.0.0.8:2379"}}, "", nil), 14544069596553697298}, // Same ID, different Time - {newMember("mem1", []url.URL{{Scheme: "http", Host: "10.0.0.8:2379"}}, nil, timeParse("1984-12-23T15:04:05Z")), 2448790162483548276}, + {newMember("mem1", []url.URL{{Scheme: "http", Host: "10.0.0.8:2379"}}, "", timeParse("1984-12-23T15:04:05Z")), 2448790162483548276}, // Different cluster name - {newMember("mcm1", []url.URL{{Scheme: "http", Host: "10.0.0.8:2379"}}, &clusterName, timeParse("1984-12-23T15:04:05Z")), 6973882743191604649}, - {newMember("mem1", []url.URL{{Scheme: "http", Host: "10.0.0.1:2379"}}, nil, timeParse("1984-12-23T15:04:05Z")), 1466075294948436910}, + {newMember("mcm1", []url.URL{{Scheme: "http", Host: "10.0.0.8:2379"}}, "etcd", timeParse("1984-12-23T15:04:05Z")), 6973882743191604649}, + {newMember("mem1", []url.URL{{Scheme: "http", Host: "10.0.0.1:2379"}}, "", timeParse("1984-12-23T15:04:05Z")), 1466075294948436910}, // Order shouldn't matter - {newMember("mem1", []url.URL{{Scheme: "http", Host: "10.0.0.1:2379"}, {Scheme: "http", Host: "10.0.0.2:2379"}}, nil, nil), 16552244735972308939}, - {newMember("mem1", []url.URL{{Scheme: "http", Host: "10.0.0.2:2379"}, {Scheme: "http", Host: "10.0.0.1:2379"}}, nil, nil), 16552244735972308939}, + {newMember("mem1", []url.URL{{Scheme: "http", Host: "10.0.0.1:2379"}, {Scheme: "http", Host: "10.0.0.2:2379"}}, "", nil), 16552244735972308939}, + {newMember("mem1", []url.URL{{Scheme: "http", Host: "10.0.0.2:2379"}, {Scheme: "http", Host: "10.0.0.1:2379"}}, "", nil), 16552244735972308939}, } for i, tt := range tests { if tt.mem.ID != tt.id { diff --git a/etcdserver/server.go b/etcdserver/server.go index 3a528fb87..ffde77c92 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -188,7 +188,7 @@ func NewServer(cfg *ServerConfig) *EtcdServer { if err != nil { log.Fatalf("etcdserver: %v", err) } - if err = cfg.Cluster.Set(s); err != nil { + if err = cfg.Cluster.SetMembersFromString(s); err != nil { log.Fatalf("etcdserver: %v", err) } } diff --git a/integration/cluster_test.go b/integration/cluster_test.go index 5ff17c6cd..8245f7e00 100644 --- a/integration/cluster_test.go +++ b/integration/cluster_test.go @@ -21,10 +21,7 @@ import ( const ( tickDuration = 5 * time.Millisecond -) - -var ( - clusterName = "etcd" + clusterName = "etcd" ) func init() { @@ -88,30 +85,29 @@ func (c *cluster) Launch(t *testing.T) { } lns := make([]net.Listener, c.Size) - bootstrapCfgs := make([]string, c.Size) + clusterCfg := etcdserver.NewCluster(clusterName) for i := 0; i < c.Size; i++ { l := newLocalListener(t) // each member claims only one peer listener lns[i] = l - bootstrapCfgs[i] = fmt.Sprintf("%s=%s", c.name(i), "http://"+l.Addr().String()) - } - clusterCfg := etcdserver.NewCluster(clusterName) - if err := clusterCfg.Set(strings.Join(bootstrapCfgs, ",")); err != nil { - t.Fatal(err) + listenUrls, err := types.NewURLs([]string{"http://" + l.Addr().String()}) + if err != nil { + t.Fatal(err) + } + _, err = clusterCfg.AddMemberFromURLs(c.name(i), listenUrls) + if err != nil { + t.Fatal(err) + } } + var err error for i := 0; i < c.Size; i++ { m := member{} m.PeerListeners = []net.Listener{lns[i]} cln := newLocalListener(t) m.ClientListeners = []net.Listener{cln} - listenUrls, err := types.NewURLs([]string{"http://" + lns[i].Addr().String()}) - if err != nil { - t.Fatal(err) - } - localMember := *etcdserver.NewMemberFromURLs(c.name(i), listenUrls, &clusterName) - m.NodeID = localMember.ID - m.Name = localMember.Name + m.NodeID = clusterCfg.FindName(c.name(i)).ID + m.Name = c.name(i) m.ClientURLs, err = types.NewURLs([]string{"http://" + cln.Addr().String()}) if err != nil { t.Fatal(err) @@ -154,6 +150,7 @@ func newLocalListener(t *testing.T) net.Listener { } type member struct { + id uint64 etcdserver.ServerConfig PeerListeners, ClientListeners []net.Listener diff --git a/main.go b/main.go index bb9fb6f94..ae11388a7 100644 --- a/main.go +++ b/main.go @@ -47,8 +47,10 @@ var ( snapCount = fs.Uint64("snapshot-count", etcdserver.DefaultSnapCount, "Number of committed transactions to trigger a snapshot") printVersion = fs.Bool("version", false, "Print the version and exit") - cluster = &etcdserver.Cluster{} - clusterState = new(etcdserver.ClusterState) + initialCluster = fs.String("initial-cluster", "default=http://localhost:2380,default=http://localhost:7001", "Initial cluster configuration for bootstrapping") + initialClusterName = fs.String("initial-cluster-name", "etcd", "Initial name for the etcd cluster during bootstrap") + cluster = &etcdserver.Cluster{} + clusterState = new(etcdserver.ClusterState) cors = &pkg.CORSInfo{} proxyFlag = new(flagtypes.Proxy) @@ -56,8 +58,6 @@ var ( clientTLSInfo = transport.TLSInfo{} peerTLSInfo = transport.TLSInfo{} - initialClusterName *string - ignored = []string{ "cluster-active-size", "cluster-remove-delay", @@ -76,13 +76,6 @@ var ( ) func init() { - fs.StringVar(initialClusterName, "initial-cluster-name", "etcd", "Initial name for the etcd cluster during bootstrap.") - fs.Var(cluster, "initial-cluster", "Initial cluster configuration for bootstrapping") - cluster = etcdserver.NewCluster(*initialClusterName) - if err := cluster.Set("default=http://localhost:2380,default=http://localhost:7001"); err != nil { - // Should never happen - log.Panic(err) - } fs.Var(clusterState, "initial-cluster-state", "Initial cluster configuration for bootstrapping") clusterState.Set(etcdserver.ClusterStateValueNew) @@ -273,24 +266,23 @@ func startProxy() { // discovery, or to the static configuration for bootstrapped clusters. // Returns the local member on success. func setupCluster() (*etcdserver.Member, error) { + cluster = etcdserver.NewCluster(*initialClusterName) + cluster.SetMembersFromString(*initialCluster) + set := make(map[string]bool) - var m *etcdserver.Member flag.Visit(func(f *flag.Flag) { set[f.Name] = true }) if set["discovery"] && set["initial-cluster"] { return nil, fmt.Errorf("both discovery and bootstrap-config are set") } - - apurls, err := pkg.URLsFromFlags(flag.CommandLine, "advertise-peer-urls", "addr", peerTLSInfo) + apurls, err := pkg.URLsFromFlags(fs, "advertise-peer-urls", "addr", peerTLSInfo) if err != nil { return nil, err } if set["discovery"] { - m = etcdserver.NewMemberFromURLs(*name, apurls, durl) cluster = etcdserver.NewCluster(*durl) - cluster.Add(*m) - return m, nil + return cluster.AddMemberFromURLs(*name, apurls) } else if set["initial-cluster"] { // We're statically configured, and cluster has appropriately been set. // Try to configure by indexing the static cluster by name. @@ -299,18 +291,7 @@ func setupCluster() (*etcdserver.Member, error) { return c, nil } } - log.Println("etcd: cannot find the passed name", *name, "in initial-cluster. Trying advertise-peer-urls") - - // Try to configure by looking for a matching machine based on the peer urls. - m = etcdserver.NewMemberFromURLs(*name, apurls, initialClusterName) - for _, c := range cluster.Members() { - if c.ID == m.ID { - return c, nil - } - } - log.Println("etcd: Could not find a matching peer for the local instance in initial-cluster.") - return nil, fmt.Errorf("etcd: Bootstrapping a static cluster, but local name or local peer urls are not defined") + return nil, fmt.Errorf("etcd: cannot find the passed name %s in --initial-cluster bootstrap list.", *name) } - m = etcdserver.NewMemberFromURLs(*name, apurls, initialClusterName) - return m, nil + return cluster.FindName(*name), nil }