Refactor Cluster to hold and add members.

This commit is contained in:
Barak Michener 2014-10-21 18:02:57 -04:00
parent 1347e3952f
commit 502a3c2460
8 changed files with 63 additions and 74 deletions

View File

@ -37,7 +37,17 @@ type Cluster struct {
} }
func NewCluster(clusterName string) *Cluster { func NewCluster(clusterName string) *Cluster {
return &Cluster{name: clusterName, members: make(map[uint64]*Member)} c := &Cluster{name: clusterName, members: make(map[uint64]*Member)}
return c
}
func (c Cluster) FindName(name string) *Member {
for _, m := range c.members {
if m.Name == name {
return m
}
}
return nil
} }
func (c Cluster) FindID(id uint64) *Member { func (c Cluster) FindID(id uint64) *Member {
@ -77,10 +87,10 @@ func (c Cluster) Pick(id uint64) string {
return "" return ""
} }
// Set parses command line sets of names to IPs formatted like: // AddMembersFromFlag parses a sets of names to IPs either from the command line or discovery formatted like:
// mach0=http://1.1.1.1,mach0=http://2.2.2.2,mach0=http://1.1.1.1,mach1=http://2.2.2.2,mach1=http://3.3.3.3 // mach0=http://1.1.1.1,mach0=http://2.2.2.2,mach0=http://1.1.1.1,mach1=http://2.2.2.2,mach1=http://3.3.3.3
func (c *Cluster) Set(s string) error { func (c *Cluster) SetMembersFromString(s string) error {
*c = *NewCluster(c.name) c.members = make(map[uint64]*Member)
v, err := url.ParseQuery(strings.Replace(s, ",", "&", -1)) v, err := url.ParseQuery(strings.Replace(s, ",", "&", -1))
if err != nil { if err != nil {
return err return err
@ -91,7 +101,7 @@ func (c *Cluster) Set(s string) error {
return fmt.Errorf("Empty URL given for %q", name) return fmt.Errorf("Empty URL given for %q", name)
} }
m := newMember(name, types.URLs(*flags.NewURLsValue(strings.Join(urls, ","))), &c.name, nil) m := newMember(name, types.URLs(*flags.NewURLsValue(strings.Join(urls, ","))), c.name, nil)
err := c.Add(*m) err := c.Add(*m)
if err != nil { if err != nil {
return err return err
@ -100,15 +110,21 @@ func (c *Cluster) Set(s string) error {
return nil return nil
} }
func (c *Cluster) AddMemberFromURLs(name string, urls types.URLs) (*Member, error) {
m := newMember(name, urls, c.name, nil)
err := c.Add(*m)
if err != nil {
return nil, err
}
return m, nil
}
func (c *Cluster) GenID(salt []byte) { func (c *Cluster) GenID(salt []byte) {
mIDs := c.MemberIDs() mIDs := c.MemberIDs()
b := make([]byte, 8*len(mIDs)) b := make([]byte, 8*len(mIDs))
for i, id := range mIDs { for i, id := range mIDs {
binary.BigEndian.PutUint64(b[8*i:], id) binary.BigEndian.PutUint64(b[8*i:], id)
} }
if len(c.name) > 0 {
b = append(b, []byte(c.name)...)
}
b = append(b, salt...) b = append(b, salt...)
hash := sha1.Sum(b) hash := sha1.Sum(b)
c.id = binary.BigEndian.Uint64(hash[:8]) c.id = binary.BigEndian.Uint64(hash[:8])

View File

@ -187,7 +187,7 @@ func TestClusterSet(t *testing.T) {
} }
g := Cluster{} g := Cluster{}
g.Set(tt.f) g.SetMembersFromString(tt.f)
if g.String() != c.String() { if g.String() != c.String() {
t.Errorf("#%d: set = %v, want %v", i, g, c) t.Errorf("#%d: set = %v, want %v", i, g, c)
@ -233,7 +233,7 @@ func TestClusterSetBad(t *testing.T) {
} }
for i, tt := range tests { for i, tt := range tests {
g := NewCluster("") g := NewCluster("")
if err := g.Set(tt); err == nil { if err := g.SetMembersFromString(tt); err == nil {
t.Errorf("#%d: set = %v, want err", i, tt) t.Errorf("#%d: set = %v, want err", i, tt)
} }
} }

View File

@ -45,7 +45,7 @@ func TestBootstrapConfigVerify(t *testing.T) {
for i, tt := range tests { for i, tt := range tests {
cluster := &Cluster{} cluster := &Cluster{}
err := cluster.Set(tt.clusterSetting) err := cluster.SetMembersFromString(tt.clusterSetting)
if err != nil && tt.shouldError { if err != nil && tt.shouldError {
continue continue
} }

View File

@ -49,7 +49,7 @@ type Member struct {
// newMember creates a Member without an ID and generates one based on the // newMember creates a Member without an ID and generates one based on the
// name, peer URLs. This is used for bootstrapping. // name, peer URLs. This is used for bootstrapping.
func newMember(name string, peerURLs types.URLs, clusterName *string, now *time.Time) *Member { func newMember(name string, peerURLs types.URLs, clusterName string, now *time.Time) *Member {
m := &Member{ m := &Member{
RaftAttributes: RaftAttributes{PeerURLs: peerURLs.StringSlice()}, RaftAttributes: RaftAttributes{PeerURLs: peerURLs.StringSlice()},
Attributes: Attributes{Name: name}, Attributes: Attributes{Name: name},
@ -61,8 +61,8 @@ func newMember(name string, peerURLs types.URLs, clusterName *string, now *time.
b = append(b, []byte(p)...) b = append(b, []byte(p)...)
} }
if clusterName != nil { if clusterName != "" {
b = append(b, []byte(*clusterName)...) b = append(b, []byte(clusterName)...)
} }
if now != nil { if now != nil {
@ -74,10 +74,6 @@ func newMember(name string, peerURLs types.URLs, clusterName *string, now *time.
return m return m
} }
func NewMemberFromURLs(name string, urls types.URLs, clusterName *string) *Member {
return newMember(name, urls, clusterName, nil)
}
func (m Member) storeKey() string { func (m Member) storeKey() string {
return path.Join(storeMembersPrefix, idAsHex(m.ID)) return path.Join(storeMembersPrefix, idAsHex(m.ID))
} }

View File

@ -31,22 +31,21 @@ func timeParse(value string) *time.Time {
} }
func TestMemberTime(t *testing.T) { func TestMemberTime(t *testing.T) {
var clusterName = "etcd"
tests := []struct { tests := []struct {
mem *Member mem *Member
id uint64 id uint64
}{ }{
{newMember("mem1", []url.URL{{Scheme: "http", Host: "10.0.0.8:2379"}}, nil, nil), 14544069596553697298}, {newMember("mem1", []url.URL{{Scheme: "http", Host: "10.0.0.8:2379"}}, "", nil), 14544069596553697298},
// Same ID, different name (names shouldn't matter) // Same ID, different name (names shouldn't matter)
{newMember("memfoo", []url.URL{{Scheme: "http", Host: "10.0.0.8:2379"}}, nil, nil), 14544069596553697298}, {newMember("memfoo", []url.URL{{Scheme: "http", Host: "10.0.0.8:2379"}}, "", nil), 14544069596553697298},
// Same ID, different Time // Same ID, different Time
{newMember("mem1", []url.URL{{Scheme: "http", Host: "10.0.0.8:2379"}}, nil, timeParse("1984-12-23T15:04:05Z")), 2448790162483548276}, {newMember("mem1", []url.URL{{Scheme: "http", Host: "10.0.0.8:2379"}}, "", timeParse("1984-12-23T15:04:05Z")), 2448790162483548276},
// Different cluster name // Different cluster name
{newMember("mcm1", []url.URL{{Scheme: "http", Host: "10.0.0.8:2379"}}, &clusterName, timeParse("1984-12-23T15:04:05Z")), 6973882743191604649}, {newMember("mcm1", []url.URL{{Scheme: "http", Host: "10.0.0.8:2379"}}, "etcd", timeParse("1984-12-23T15:04:05Z")), 6973882743191604649},
{newMember("mem1", []url.URL{{Scheme: "http", Host: "10.0.0.1:2379"}}, nil, timeParse("1984-12-23T15:04:05Z")), 1466075294948436910}, {newMember("mem1", []url.URL{{Scheme: "http", Host: "10.0.0.1:2379"}}, "", timeParse("1984-12-23T15:04:05Z")), 1466075294948436910},
// Order shouldn't matter // Order shouldn't matter
{newMember("mem1", []url.URL{{Scheme: "http", Host: "10.0.0.1:2379"}, {Scheme: "http", Host: "10.0.0.2:2379"}}, nil, nil), 16552244735972308939}, {newMember("mem1", []url.URL{{Scheme: "http", Host: "10.0.0.1:2379"}, {Scheme: "http", Host: "10.0.0.2:2379"}}, "", nil), 16552244735972308939},
{newMember("mem1", []url.URL{{Scheme: "http", Host: "10.0.0.2:2379"}, {Scheme: "http", Host: "10.0.0.1:2379"}}, nil, nil), 16552244735972308939}, {newMember("mem1", []url.URL{{Scheme: "http", Host: "10.0.0.2:2379"}, {Scheme: "http", Host: "10.0.0.1:2379"}}, "", nil), 16552244735972308939},
} }
for i, tt := range tests { for i, tt := range tests {
if tt.mem.ID != tt.id { if tt.mem.ID != tt.id {

View File

@ -188,7 +188,7 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
if err != nil { if err != nil {
log.Fatalf("etcdserver: %v", err) log.Fatalf("etcdserver: %v", err)
} }
if err = cfg.Cluster.Set(s); err != nil { if err = cfg.Cluster.SetMembersFromString(s); err != nil {
log.Fatalf("etcdserver: %v", err) log.Fatalf("etcdserver: %v", err)
} }
} }

View File

@ -21,10 +21,7 @@ import (
const ( const (
tickDuration = 5 * time.Millisecond tickDuration = 5 * time.Millisecond
) clusterName = "etcd"
var (
clusterName = "etcd"
) )
func init() { func init() {
@ -88,30 +85,29 @@ func (c *cluster) Launch(t *testing.T) {
} }
lns := make([]net.Listener, c.Size) lns := make([]net.Listener, c.Size)
bootstrapCfgs := make([]string, c.Size) clusterCfg := etcdserver.NewCluster(clusterName)
for i := 0; i < c.Size; i++ { for i := 0; i < c.Size; i++ {
l := newLocalListener(t) l := newLocalListener(t)
// each member claims only one peer listener // each member claims only one peer listener
lns[i] = l lns[i] = l
bootstrapCfgs[i] = fmt.Sprintf("%s=%s", c.name(i), "http://"+l.Addr().String()) listenUrls, err := types.NewURLs([]string{"http://" + l.Addr().String()})
} if err != nil {
clusterCfg := etcdserver.NewCluster(clusterName) t.Fatal(err)
if err := clusterCfg.Set(strings.Join(bootstrapCfgs, ",")); err != nil { }
t.Fatal(err) _, err = clusterCfg.AddMemberFromURLs(c.name(i), listenUrls)
if err != nil {
t.Fatal(err)
}
} }
var err error
for i := 0; i < c.Size; i++ { for i := 0; i < c.Size; i++ {
m := member{} m := member{}
m.PeerListeners = []net.Listener{lns[i]} m.PeerListeners = []net.Listener{lns[i]}
cln := newLocalListener(t) cln := newLocalListener(t)
m.ClientListeners = []net.Listener{cln} m.ClientListeners = []net.Listener{cln}
listenUrls, err := types.NewURLs([]string{"http://" + lns[i].Addr().String()}) m.NodeID = clusterCfg.FindName(c.name(i)).ID
if err != nil { m.Name = c.name(i)
t.Fatal(err)
}
localMember := *etcdserver.NewMemberFromURLs(c.name(i), listenUrls, &clusterName)
m.NodeID = localMember.ID
m.Name = localMember.Name
m.ClientURLs, err = types.NewURLs([]string{"http://" + cln.Addr().String()}) m.ClientURLs, err = types.NewURLs([]string{"http://" + cln.Addr().String()})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -154,6 +150,7 @@ func newLocalListener(t *testing.T) net.Listener {
} }
type member struct { type member struct {
id uint64
etcdserver.ServerConfig etcdserver.ServerConfig
PeerListeners, ClientListeners []net.Listener PeerListeners, ClientListeners []net.Listener

41
main.go
View File

@ -47,8 +47,10 @@ var (
snapCount = fs.Uint64("snapshot-count", etcdserver.DefaultSnapCount, "Number of committed transactions to trigger a snapshot") snapCount = fs.Uint64("snapshot-count", etcdserver.DefaultSnapCount, "Number of committed transactions to trigger a snapshot")
printVersion = fs.Bool("version", false, "Print the version and exit") printVersion = fs.Bool("version", false, "Print the version and exit")
cluster = &etcdserver.Cluster{} initialCluster = fs.String("initial-cluster", "default=http://localhost:2380,default=http://localhost:7001", "Initial cluster configuration for bootstrapping")
clusterState = new(etcdserver.ClusterState) initialClusterName = fs.String("initial-cluster-name", "etcd", "Initial name for the etcd cluster during bootstrap")
cluster = &etcdserver.Cluster{}
clusterState = new(etcdserver.ClusterState)
cors = &pkg.CORSInfo{} cors = &pkg.CORSInfo{}
proxyFlag = new(flagtypes.Proxy) proxyFlag = new(flagtypes.Proxy)
@ -56,8 +58,6 @@ var (
clientTLSInfo = transport.TLSInfo{} clientTLSInfo = transport.TLSInfo{}
peerTLSInfo = transport.TLSInfo{} peerTLSInfo = transport.TLSInfo{}
initialClusterName *string
ignored = []string{ ignored = []string{
"cluster-active-size", "cluster-active-size",
"cluster-remove-delay", "cluster-remove-delay",
@ -76,13 +76,6 @@ var (
) )
func init() { func init() {
fs.StringVar(initialClusterName, "initial-cluster-name", "etcd", "Initial name for the etcd cluster during bootstrap.")
fs.Var(cluster, "initial-cluster", "Initial cluster configuration for bootstrapping")
cluster = etcdserver.NewCluster(*initialClusterName)
if err := cluster.Set("default=http://localhost:2380,default=http://localhost:7001"); err != nil {
// Should never happen
log.Panic(err)
}
fs.Var(clusterState, "initial-cluster-state", "Initial cluster configuration for bootstrapping") fs.Var(clusterState, "initial-cluster-state", "Initial cluster configuration for bootstrapping")
clusterState.Set(etcdserver.ClusterStateValueNew) clusterState.Set(etcdserver.ClusterStateValueNew)
@ -273,24 +266,23 @@ func startProxy() {
// discovery, or to the static configuration for bootstrapped clusters. // discovery, or to the static configuration for bootstrapped clusters.
// Returns the local member on success. // Returns the local member on success.
func setupCluster() (*etcdserver.Member, error) { func setupCluster() (*etcdserver.Member, error) {
cluster = etcdserver.NewCluster(*initialClusterName)
cluster.SetMembersFromString(*initialCluster)
set := make(map[string]bool) set := make(map[string]bool)
var m *etcdserver.Member
flag.Visit(func(f *flag.Flag) { flag.Visit(func(f *flag.Flag) {
set[f.Name] = true set[f.Name] = true
}) })
if set["discovery"] && set["initial-cluster"] { if set["discovery"] && set["initial-cluster"] {
return nil, fmt.Errorf("both discovery and bootstrap-config are set") return nil, fmt.Errorf("both discovery and bootstrap-config are set")
} }
apurls, err := pkg.URLsFromFlags(fs, "advertise-peer-urls", "addr", peerTLSInfo)
apurls, err := pkg.URLsFromFlags(flag.CommandLine, "advertise-peer-urls", "addr", peerTLSInfo)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if set["discovery"] { if set["discovery"] {
m = etcdserver.NewMemberFromURLs(*name, apurls, durl)
cluster = etcdserver.NewCluster(*durl) cluster = etcdserver.NewCluster(*durl)
cluster.Add(*m) return cluster.AddMemberFromURLs(*name, apurls)
return m, nil
} else if set["initial-cluster"] { } else if set["initial-cluster"] {
// We're statically configured, and cluster has appropriately been set. // We're statically configured, and cluster has appropriately been set.
// Try to configure by indexing the static cluster by name. // Try to configure by indexing the static cluster by name.
@ -299,18 +291,7 @@ func setupCluster() (*etcdserver.Member, error) {
return c, nil return c, nil
} }
} }
log.Println("etcd: cannot find the passed name", *name, "in initial-cluster. Trying advertise-peer-urls") return nil, fmt.Errorf("etcd: cannot find the passed name %s in --initial-cluster bootstrap list.", *name)
// Try to configure by looking for a matching machine based on the peer urls.
m = etcdserver.NewMemberFromURLs(*name, apurls, initialClusterName)
for _, c := range cluster.Members() {
if c.ID == m.ID {
return c, nil
}
}
log.Println("etcd: Could not find a matching peer for the local instance in initial-cluster.")
return nil, fmt.Errorf("etcd: Bootstrapping a static cluster, but local name or local peer urls are not defined")
} }
m = etcdserver.NewMemberFromURLs(*name, apurls, initialClusterName) return cluster.FindName(*name), nil
return m, nil
} }