Merge pull request #2766 from yichengq/345

*: extract types.Cluster from etcdserver.Cluster
This commit is contained in:
Yicheng Qin 2015-05-12 15:52:24 -07:00
commit f4c51cb5a1
11 changed files with 316 additions and 376 deletions

View File

@ -23,7 +23,6 @@ import (
"os"
"path"
"reflect"
"strings"
"time"
"github.com/coreos/etcd/discovery"
@ -71,6 +70,7 @@ func Main() {
var stopped <-chan struct{}
// TODO: check whether fields are set instead of whether fields have default value
if cfg.name != defaultName && cfg.initialCluster == initialClusterFromName(defaultName) {
cfg.initialCluster = initialClusterFromName(cfg.name)
}
@ -116,7 +116,7 @@ func Main() {
// startEtcd launches the etcd server and HTTP handlers for client/server communication.
func startEtcd(cfg *config) (<-chan struct{}, error) {
cls, err := setupCluster(cfg)
urlsmap, token, err := getPeerURLsMapAndToken(cfg)
if err != nil {
return nil, fmt.Errorf("error setting up initial cluster: %v", err)
}
@ -171,21 +171,22 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
}
srvcfg := &etcdserver.ServerConfig{
Name: cfg.name,
ClientURLs: cfg.acurls,
PeerURLs: cfg.apurls,
DataDir: cfg.dir,
SnapCount: cfg.snapCount,
MaxSnapFiles: cfg.maxSnapFiles,
MaxWALFiles: cfg.maxWalFiles,
Cluster: cls,
DiscoveryURL: cfg.durl,
DiscoveryProxy: cfg.dproxy,
NewCluster: cfg.isNewCluster(),
ForceNewCluster: cfg.forceNewCluster,
Transport: pt,
TickMs: cfg.TickMs,
ElectionTicks: cfg.electionTicks(),
Name: cfg.name,
ClientURLs: cfg.acurls,
PeerURLs: cfg.apurls,
DataDir: cfg.dir,
SnapCount: cfg.snapCount,
MaxSnapFiles: cfg.maxSnapFiles,
MaxWALFiles: cfg.maxWalFiles,
InitialPeerURLsMap: urlsmap,
InitialClusterToken: token,
DiscoveryURL: cfg.durl,
DiscoveryProxy: cfg.dproxy,
NewCluster: cfg.isNewCluster(),
ForceNewCluster: cfg.forceNewCluster,
Transport: pt,
TickMs: cfg.TickMs,
ElectionTicks: cfg.electionTicks(),
}
var s *etcdserver.EtcdServer
s, err = etcdserver.NewServer(srvcfg)
@ -222,7 +223,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
// startProxy launches an HTTP proxy for client communication which proxies to other etcd nodes.
func startProxy(cfg *config) error {
cls, err := setupCluster(cfg)
urlsmap, _, err := getPeerURLsMapAndToken(cfg)
if err != nil {
return fmt.Errorf("error setting up initial cluster: %v", err)
}
@ -232,7 +233,7 @@ func startProxy(cfg *config) error {
if err != nil {
return err
}
if cls, err = etcdserver.NewClusterFromString(cfg.durl, s); err != nil {
if urlsmap, err = types.NewURLsMap(s); err != nil {
return err
}
}
@ -267,12 +268,13 @@ func startProxy(cfg *config) error {
peerURLs = urls.PeerURLs
log.Printf("proxy: using peer urls %v from cluster file ./%s", peerURLs, clusterfile)
case os.IsNotExist(err):
peerURLs = cls.PeerURLs()
peerURLs = urlsmap.URLs()
log.Printf("proxy: using peer urls %v ", peerURLs)
default:
return err
}
clientURLs := []string{}
uf := func() []string {
gcls, err := etcdserver.GetClusterFromRemotePeers(peerURLs, tr)
// TODO: remove the 2nd check when we fix GetClusterFromPeers
@ -282,33 +284,33 @@ func startProxy(cfg *config) error {
return []string{}
}
if len(gcls.Members()) == 0 {
return cls.ClientURLs()
return clientURLs
}
cls = gcls
clientURLs = gcls.ClientURLs()
urls := struct{ PeerURLs []string }{cls.PeerURLs()}
urls := struct{ PeerURLs []string }{gcls.PeerURLs()}
b, err := json.Marshal(urls)
if err != nil {
log.Printf("proxy: error on marshal peer urls %s", err)
return cls.ClientURLs()
return clientURLs
}
err = ioutil.WriteFile(clusterfile+".bak", b, 0600)
if err != nil {
log.Printf("proxy: error on writing urls %s", err)
return cls.ClientURLs()
return clientURLs
}
err = os.Rename(clusterfile+".bak", clusterfile)
if err != nil {
log.Printf("proxy: error on updating clusterfile %s", err)
return cls.ClientURLs()
return clientURLs
}
if !reflect.DeepEqual(cls.PeerURLs(), peerURLs) {
log.Printf("proxy: updated peer urls in cluster file from %v to %v", peerURLs, cls.PeerURLs())
if !reflect.DeepEqual(gcls.PeerURLs(), peerURLs) {
log.Printf("proxy: updated peer urls in cluster file from %v to %v", peerURLs, gcls.PeerURLs())
}
peerURLs = cls.PeerURLs()
peerURLs = gcls.PeerURLs()
return cls.ClientURLs()
return clientURLs
}
ph := proxy.NewHandler(pt, uf)
ph = &cors.CORSHandler{
@ -335,35 +337,28 @@ func startProxy(cfg *config) error {
return nil
}
// setupCluster sets up an initial cluster definition for bootstrap or discovery.
func setupCluster(cfg *config) (*etcdserver.Cluster, error) {
var cls *etcdserver.Cluster
var err error
// getPeerURLsMapAndToken sets up an initial peer URLsMap and cluster token for bootstrap or discovery.
func getPeerURLsMapAndToken(cfg *config) (urlsmap types.URLsMap, token string, err error) {
switch {
case cfg.durl != "":
urlsmap = types.URLsMap{}
// If using discovery, generate a temporary cluster based on
// self's advertised peer URLs
clusterStr := genClusterString(cfg.name, cfg.apurls)
cls, err = etcdserver.NewClusterFromString(cfg.durl, clusterStr)
urlsmap[cfg.name] = cfg.apurls
token = cfg.durl
case cfg.dnsCluster != "":
clusterStr, clusterToken, err := discovery.SRVGetCluster(cfg.name, cfg.dnsCluster, cfg.initialClusterToken, cfg.apurls)
var clusterStr string
clusterStr, token, err = discovery.SRVGetCluster(cfg.name, cfg.dnsCluster, cfg.initialClusterToken, cfg.apurls)
if err != nil {
return nil, err
return nil, "", err
}
cls, err = etcdserver.NewClusterFromString(clusterToken, clusterStr)
urlsmap, err = types.NewURLsMap(clusterStr)
default:
// We're statically configured, and cluster has appropriately been set.
cls, err = etcdserver.NewClusterFromString(cfg.initialClusterToken, cfg.initialCluster)
urlsmap, err = types.NewURLsMap(cfg.initialCluster)
token = cfg.initialClusterToken
}
return cls, err
}
func genClusterString(name string, urls types.URLs) string {
addrs := make([]string, 0)
for _, u := range urls {
addrs = append(addrs, fmt.Sprintf("%v=%v", name, u.String()))
}
return strings.Join(addrs, ",")
return urlsmap, token, err
}
// identifyDataDirOrDie returns the type of the data dir.

View File

@ -1,45 +0,0 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package etcdmain
import (
"testing"
"github.com/coreos/etcd/pkg/testutil"
)
func TestGenClusterString(t *testing.T) {
tests := []struct {
token string
urls []string
wstr string
}{
{
"default", []string{"http://127.0.0.1:2379"},
"default=http://127.0.0.1:2379",
},
{
"node1", []string{"http://0.0.0.0:2379", "http://1.1.1.1:2379"},
"node1=http://0.0.0.0:2379,node1=http://1.1.1.1:2379",
},
}
for i, tt := range tests {
urls := testutil.MustNewURLs(t, tt.urls)
str := genClusterString(tt.token, urls)
if str != tt.wstr {
t.Errorf("#%d: cluster = %s, want %s", i, str, tt.wstr)
}
}
}

View File

@ -15,21 +15,21 @@
package etcdserver
import (
"bytes"
"crypto/sha1"
"encoding/binary"
"encoding/json"
"fmt"
"log"
"net/url"
"path"
"sort"
"strings"
"sync"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver"
"github.com/coreos/etcd/pkg/flags"
"github.com/coreos/etcd/pkg/netutil"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/store"
)
@ -69,28 +69,15 @@ type Cluster struct {
removed map[types.ID]bool
}
// NewClusterFromString returns a Cluster instantiated from the given cluster token
// and cluster string, by parsing members from a set of discovery-formatted
// names-to-IPs, like:
// mach0=http://1.1.1.1,mach0=http://2.2.2.2,mach1=http://3.3.3.3,mach2=http://4.4.4.4
func NewClusterFromString(token string, cluster string) (*Cluster, error) {
func NewCluster(token string, initial types.URLsMap) (*Cluster, error) {
c := newCluster(token)
v, err := url.ParseQuery(strings.Replace(cluster, ",", "&", -1))
if err != nil {
return nil, err
}
for name, urls := range v {
if len(urls) == 0 || urls[0] == "" {
return nil, fmt.Errorf("Empty URL given for %q", name)
}
purls := &flags.URLsValue{}
if err := purls.Set(strings.Join(urls, ",")); err != nil {
return nil, err
}
m := NewMember(name, types.URLs(*purls), c.token, nil)
for name, urls := range initial {
m := NewMember(name, urls, token, nil)
if _, ok := c.members[m.ID]; ok {
return nil, fmt.Errorf("Member exists with identical ID %v", m)
return nil, fmt.Errorf("member exists with identical ID %v", m)
}
if uint64(m.ID) == raft.None {
return nil, fmt.Errorf("cannot use %x as member id", raft.None)
}
c.members[m.ID] = m
}
@ -98,14 +85,6 @@ func NewClusterFromString(token string, cluster string) (*Cluster, error) {
return c, nil
}
func NewClusterFromStore(token string, st store.Store) *Cluster {
c := newCluster(token)
c.store = st
c.members, c.removed = membersFromStore(c.store)
c.version = clusterVersionFromStore(c.store)
return c
}
func NewClusterFromMembers(token string, id types.ID, membs []*Member) *Cluster {
c := newCluster(token)
c.id = id
@ -209,14 +188,19 @@ func (c *Cluster) ClientURLs() []string {
func (c *Cluster) String() string {
c.Lock()
defer c.Unlock()
sl := []string{}
b := &bytes.Buffer{}
fmt.Fprintf(b, "{ClusterID:%s ", c.id)
var ms []string
for _, m := range c.members {
for _, u := range m.PeerURLs {
sl = append(sl, fmt.Sprintf("%s=%s", m.Name, u))
}
ms = append(ms, fmt.Sprintf("%+v", m))
}
sort.Strings(sl)
return strings.Join(sl, ",")
fmt.Fprintf(b, "Members:[%s] ", strings.Join(ms, " "))
var ids []string
for id, _ := range c.removed {
ids = append(ids, fmt.Sprintf("%s", id))
}
fmt.Fprintf(b, "RemovedMemberIDs:[%s]}", strings.Join(ids, " "))
return b.String()
}
func (c *Cluster) genID() {
@ -371,20 +355,6 @@ func (c *Cluster) SetVersion(ver *semver.Version) {
c.version = ver
}
// Validate ensures that there is no identical urls in the cluster peer list
func (c *Cluster) Validate() error {
urlMap := make(map[string]bool)
for _, m := range c.Members() {
for _, url := range m.PeerURLs {
if urlMap[url] {
return fmt.Errorf("duplicate url %v in cluster config", url)
}
urlMap[url] = true
}
}
return nil
}
func membersFromStore(st store.Store) (map[types.ID]*Member, map[types.ID]bool) {
members := make(map[types.ID]*Member)
removed := make(map[types.ID]bool)

View File

@ -21,110 +21,12 @@ import (
"reflect"
"testing"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver"
"github.com/coreos/etcd/pkg/testutil"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/store"
)
func TestClusterFromString(t *testing.T) {
tests := []struct {
f string
mems []*Member
}{
{
"mem1=http://10.0.0.1:2379,mem1=http://128.193.4.20:2379,mem2=http://10.0.0.2:2379,default=http://127.0.0.1:2379",
[]*Member{
newTestMember(3141198903430435750, []string{"http://10.0.0.2:2379"}, "mem2", nil),
newTestMember(4322322643958477905, []string{"http://10.0.0.1:2379", "http://128.193.4.20:2379"}, "mem1", nil),
newTestMember(12762790032478827328, []string{"http://127.0.0.1:2379"}, "default", nil),
},
},
}
for i, tt := range tests {
c, err := NewClusterFromString("abc", tt.f)
if err != nil {
t.Fatalf("#%d: unexpected new error: %v", i, err)
}
if c.token != "abc" {
t.Errorf("#%d: token = %v, want abc", i, c.token)
}
if !reflect.DeepEqual(c.Members(), tt.mems) {
t.Errorf("#%d: members = %+v, want %+v", i, c.Members(), tt.mems)
}
}
}
func TestClusterFromStringBad(t *testing.T) {
tests := []string{
// invalid URL
"%^",
// no URL defined for member
"mem1=,mem2=http://128.193.4.20:2379,mem3=http://10.0.0.2:2379",
"mem1,mem2=http://128.193.4.20:2379,mem3=http://10.0.0.2:2379",
// bad URL for member
"default=http://localhost/",
// TODO(philips): anyone know of a 64 bit sha1 hash collision
// "06b2f82fd81b2c20=http://128.193.4.20:2379,02c60cb75083ceef=http://128.193.4.20:2379",
// the same url for two members
"mem1=http://128.193.4.20:2379,mem2=http://128.193.4.20:2379",
}
for i, tt := range tests {
if _, err := NewClusterFromString("abc", tt); err == nil {
t.Errorf("#%d: unexpected successful new, want err", i)
}
}
}
func TestClusterFromStore(t *testing.T) {
tests := []struct {
mems []*Member
ver *semver.Version
}{
{
[]*Member{newTestMember(1, nil, "", nil)},
semver.Must(semver.NewVersion("2.0.0")),
},
{
nil,
nil,
},
{
[]*Member{
newTestMember(1, nil, "", nil),
newTestMember(2, nil, "", nil),
},
semver.Must(semver.NewVersion("2.0.0")),
},
}
for i, tt := range tests {
st := store.New()
hc := newTestCluster(nil)
hc.SetStore(st)
for _, m := range tt.mems {
hc.AddMember(m)
}
if tt.ver != nil {
_, err := st.Set(path.Join(StoreClusterPrefix, "version"), false, tt.ver.String(), store.Permanent)
if err != nil {
t.Fatal(err)
}
}
c := NewClusterFromStore("abc", st)
if c.token != "abc" {
t.Errorf("#%d: token = %v, want %v", i, c.token, "abc")
}
if !reflect.DeepEqual(c.Members(), tt.mems) {
t.Errorf("#%d: members = %v, want %v", i, c.Members(), tt.mems)
}
if !reflect.DeepEqual(c.Version(), tt.ver) {
t.Errorf("#%d: ver = %v, want %v", i, c.Version(), tt.ver)
}
}
}
func TestClusterMember(t *testing.T) {
membs := []*Member{
newTestMember(1, nil, "node1", nil),
@ -589,49 +491,6 @@ func TestClusterMembers(t *testing.T) {
}
}
func TestClusterString(t *testing.T) {
cls := &Cluster{
members: map[types.ID]*Member{
1: newTestMember(
1,
[]string{"http://1.1.1.1:1111", "http://0.0.0.0:0000"},
"abc",
nil,
),
2: newTestMember(
2,
[]string{"http://2.2.2.2:2222"},
"def",
nil,
),
3: newTestMember(
3,
[]string{"http://3.3.3.3:1234", "http://127.0.0.1:2380"},
"ghi",
nil,
),
// no PeerURLs = not included
4: newTestMember(
4,
[]string{},
"four",
nil,
),
5: newTestMember(
5,
nil,
"five",
nil,
),
},
}
w := "abc=http://0.0.0.0:0000,abc=http://1.1.1.1:1111,def=http://2.2.2.2:2222,ghi=http://127.0.0.1:2380,ghi=http://3.3.3.3:1234"
if g := cls.String(); g != w {
t.Fatalf("Cluster.String():\ngot %#v\nwant %#v", g, w)
}
}
func TestClusterRemoveMember(t *testing.T) {
st := &storeRecorder{}
c := newTestCluster(nil)

View File

@ -23,24 +23,24 @@ import (
"github.com/coreos/etcd/pkg/netutil"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft"
)
// ServerConfig holds the configuration of etcd as taken from the command line or discovery.
type ServerConfig struct {
Name string
DiscoveryURL string
DiscoveryProxy string
ClientURLs types.URLs
PeerURLs types.URLs
DataDir string
SnapCount uint64
MaxSnapFiles uint
MaxWALFiles uint
Cluster *Cluster
NewCluster bool
ForceNewCluster bool
Transport *http.Transport
Name string
DiscoveryURL string
DiscoveryProxy string
ClientURLs types.URLs
PeerURLs types.URLs
DataDir string
SnapCount uint64
MaxSnapFiles uint
MaxWALFiles uint
InitialPeerURLsMap types.URLsMap
InitialClusterToken string
NewCluster bool
ForceNewCluster bool
Transport *http.Transport
TickMs uint
ElectionTicks int
@ -52,10 +52,10 @@ func (c *ServerConfig) VerifyBootstrap() error {
if err := c.verifyLocalMember(true); err != nil {
return err
}
if err := c.Cluster.Validate(); err != nil {
return err
if checkDuplicateURL(c.InitialPeerURLsMap) {
return fmt.Errorf("initial cluster %s has duplicate url", c.InitialPeerURLsMap)
}
if c.Cluster.String() == "" && c.DiscoveryURL == "" {
if c.InitialPeerURLsMap.String() == "" && c.DiscoveryURL == "" {
return fmt.Errorf("initial cluster unset and no discovery URL found")
}
return nil
@ -70,8 +70,8 @@ func (c *ServerConfig) VerifyJoinExisting() error {
if err := c.verifyLocalMember(false); err != nil {
return err
}
if err := c.Cluster.Validate(); err != nil {
return err
if checkDuplicateURL(c.InitialPeerURLsMap) {
return fmt.Errorf("initial cluster %s has duplicate url", c.InitialPeerURLsMap)
}
if c.DiscoveryURL != "" {
return fmt.Errorf("discovery URL should not be set when joining existing initial cluster")
@ -83,21 +83,19 @@ func (c *ServerConfig) VerifyJoinExisting() error {
// cluster. If strict is set, it also verifies the configured member
// has the same peer urls as configured advertised peer urls.
func (c *ServerConfig) verifyLocalMember(strict bool) error {
m := c.Cluster.MemberByName(c.Name)
urls := c.InitialPeerURLsMap[c.Name]
// Make sure the cluster at least contains the local server.
if m == nil {
if urls == nil {
return fmt.Errorf("couldn't find local name %q in the initial cluster configuration", c.Name)
}
if uint64(m.ID) == raft.None {
return fmt.Errorf("cannot use %x as member id", raft.None)
}
// Advertised peer URLs must match those in the cluster peer list
// TODO: Remove URLStringsEqual after improvement of using hostnames #2150 #2123
apurls := c.PeerURLs.StringSlice()
sort.Strings(apurls)
urls.Sort()
if strict {
if !netutil.URLStringsEqual(apurls, m.PeerURLs) {
if !netutil.URLStringsEqual(apurls, urls.StringSlice()) {
return fmt.Errorf("%s has different advertised URLs in the cluster and advertised peer URLs list", c.Name)
}
}
@ -135,6 +133,20 @@ func (c *ServerConfig) print(initial bool) {
log.Printf("etcdserver: advertise client URLs = %s", c.ClientURLs)
if initial {
log.Printf("etcdserver: initial advertise peer URLs = %s", c.PeerURLs)
log.Printf("etcdserver: initial cluster = %s", c.Cluster)
log.Printf("etcdserver: initial cluster = %s", c.InitialPeerURLsMap)
}
}
func checkDuplicateURL(urlsmap types.URLsMap) bool {
um := make(map[string]bool)
for _, urls := range urlsmap {
for _, url := range urls {
u := url.String()
if um[u] {
return true
}
um[u] = true
}
}
return false
}

View File

@ -33,14 +33,10 @@ func mustNewURLs(t *testing.T, urls []string) []url.URL {
}
func TestConfigVerifyBootstrapWithoutClusterAndDiscoveryURLFail(t *testing.T) {
cluster, err := NewClusterFromString("", "")
if err != nil {
t.Fatalf("NewClusterFromString error: %v", err)
}
c := &ServerConfig{
Name: "node1",
DiscoveryURL: "",
Cluster: cluster,
Name: "node1",
DiscoveryURL: "",
InitialPeerURLsMap: types.URLsMap{},
}
if err := c.VerifyBootstrap(); err == nil {
t.Errorf("err = nil, want not nil")
@ -48,16 +44,16 @@ func TestConfigVerifyBootstrapWithoutClusterAndDiscoveryURLFail(t *testing.T) {
}
func TestConfigVerifyExistingWithDiscoveryURLFail(t *testing.T) {
cluster, err := NewClusterFromString("", "node1=http://127.0.0.1:2380")
cluster, err := types.NewURLsMap("node1=http://127.0.0.1:2380")
if err != nil {
t.Fatalf("NewClusterFromString error: %v", err)
t.Fatalf("NewCluster error: %v", err)
}
c := &ServerConfig{
Name: "node1",
DiscoveryURL: "http://127.0.0.1:2379/abcdefg",
PeerURLs: mustNewURLs(t, []string{"http://127.0.0.1:2380"}),
Cluster: cluster,
NewCluster: false,
Name: "node1",
DiscoveryURL: "http://127.0.0.1:2379/abcdefg",
PeerURLs: mustNewURLs(t, []string{"http://127.0.0.1:2380"}),
InitialPeerURLsMap: cluster,
NewCluster: false,
}
if err := c.VerifyJoinExisting(); err == nil {
t.Errorf("err = nil, want not nil")
@ -130,20 +126,19 @@ func TestConfigVerifyLocalMember(t *testing.T) {
}
for i, tt := range tests {
cluster, err := NewClusterFromString("", tt.clusterSetting)
cluster, err := types.NewURLsMap(tt.clusterSetting)
if err != nil {
t.Fatalf("#%d: Got unexpected error: %v", i, err)
}
cfg := ServerConfig{
Name: "node1",
Cluster: cluster,
Name: "node1",
InitialPeerURLsMap: cluster,
}
if tt.apurls != nil {
cfg.PeerURLs = mustNewURLs(t, tt.apurls)
}
err = cfg.verifyLocalMember(tt.strict)
if (err == nil) && tt.shouldError {
t.Errorf("%#v", *cluster)
t.Errorf("#%d: Got no error where one was expected", i)
}
if (err != nil) && !tt.shouldError {

View File

@ -192,13 +192,13 @@ func (r *raftNode) resumeSending() {
p.Resume()
}
func startNode(cfg *ServerConfig, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) {
func startNode(cfg *ServerConfig, cl *Cluster, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) {
var err error
member := cfg.Cluster.MemberByName(cfg.Name)
member := cl.MemberByName(cfg.Name)
metadata := pbutil.MustMarshal(
&pb.Metadata{
NodeID: uint64(member.ID),
ClusterID: uint64(cfg.Cluster.ID()),
ClusterID: uint64(cl.ID()),
},
)
if err := os.MkdirAll(cfg.SnapDir(), privateDirMode); err != nil {
@ -209,14 +209,14 @@ func startNode(cfg *ServerConfig, ids []types.ID) (id types.ID, n raft.Node, s *
}
peers := make([]raft.Peer, len(ids))
for i, id := range ids {
ctx, err := json.Marshal((*cfg.Cluster).Member(id))
ctx, err := json.Marshal((*cl).Member(id))
if err != nil {
log.Panicf("marshal member should never fail: %v", err)
}
peers[i] = raft.Peer{ID: uint64(id), Context: ctx}
}
id = member.ID
log.Printf("etcdserver: start member %s in cluster %s", id, cfg.Cluster.ID())
log.Printf("etcdserver: start member %s in cluster %s", id, cl.ID())
s = raft.NewMemoryStorage()
c := &raft.Config{
ID: uint64(id),
@ -231,15 +231,16 @@ func startNode(cfg *ServerConfig, ids []types.ID) (id types.ID, n raft.Node, s *
return
}
func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, raft.Node, *raft.MemoryStorage, *wal.WAL) {
func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *Cluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
var walsnap walpb.Snapshot
if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
}
w, id, cid, st, ents := readWAL(cfg.WALDir(), walsnap)
cfg.Cluster.SetID(cid)
log.Printf("etcdserver: restart member %s in cluster %s at commit index %d", id, cfg.Cluster.ID(), st.Commit)
log.Printf("etcdserver: restart member %s in cluster %s at commit index %d", id, cid, st.Commit)
cl := newCluster("")
cl.SetID(cid)
s := raft.NewMemoryStorage()
if snapshot != nil {
s.ApplySnapshot(*snapshot)
@ -256,16 +257,15 @@ func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, raft.N
}
n := raft.RestartNode(c)
raftStatus = n.Status
return id, n, s, w
return id, cl, n, s, w
}
func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, raft.Node, *raft.MemoryStorage, *wal.WAL) {
func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *Cluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
var walsnap walpb.Snapshot
if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
}
w, id, cid, st, ents := readWAL(cfg.WALDir(), walsnap)
cfg.Cluster.SetID(cid)
// discard the previously uncommitted entries
for i, ent := range ents {
@ -289,7 +289,9 @@ func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (type
st.Commit = ents[len(ents)-1].Index
}
log.Printf("etcdserver: forcing restart of member %s in cluster %s at commit index %d", id, cfg.Cluster.ID(), st.Commit)
log.Printf("etcdserver: forcing restart of member %s in cluster %s at commit index %d", id, cid, st.Commit)
cl := newCluster("")
cl.SetID(cid)
s := raft.NewMemoryStorage()
if snapshot != nil {
s.ApplySnapshot(*snapshot)
@ -306,7 +308,7 @@ func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (type
}
n := raft.RestartNode(c)
raftStatus = n.Status
return id, n, s, w
return id, cl, n, s, w
}
// getIDs returns an ordered set of IDs included in the given snapshot and

View File

@ -178,6 +178,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
var n raft.Node
var s *raft.MemoryStorage
var id types.ID
var cl *Cluster
// Run the migrations.
dataVer, err := version.DetectDataDir(cfg.DataDir)
@ -197,41 +198,53 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
if err := cfg.VerifyJoinExisting(); err != nil {
return nil, err
}
existingCluster, err := GetClusterFromRemotePeers(getRemotePeerURLs(cfg.Cluster, cfg.Name), cfg.Transport)
cl, err = NewCluster(cfg.InitialClusterToken, cfg.InitialPeerURLsMap)
if err != nil {
return nil, err
}
existingCluster, err := GetClusterFromRemotePeers(getRemotePeerURLs(cl, cfg.Name), cfg.Transport)
if err != nil {
return nil, fmt.Errorf("cannot fetch cluster info from peer urls: %v", err)
}
if err := ValidateClusterAndAssignIDs(cfg.Cluster, existingCluster); err != nil {
if err := ValidateClusterAndAssignIDs(cl, existingCluster); err != nil {
return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err)
}
remotes = existingCluster.Members()
cfg.Cluster.SetID(existingCluster.id)
cfg.Cluster.SetStore(st)
cl.SetID(existingCluster.id)
cl.SetStore(st)
cfg.Print()
id, n, s, w = startNode(cfg, nil)
id, n, s, w = startNode(cfg, cl, nil)
case !haveWAL && cfg.NewCluster:
if err := cfg.VerifyBootstrap(); err != nil {
return nil, err
}
m := cfg.Cluster.MemberByName(cfg.Name)
if isMemberBootstrapped(cfg.Cluster, cfg.Name, cfg.Transport) {
cl, err = NewCluster(cfg.InitialClusterToken, cfg.InitialPeerURLsMap)
if err != nil {
return nil, err
}
m := cl.MemberByName(cfg.Name)
if isMemberBootstrapped(cl, cfg.Name, cfg.Transport) {
return nil, fmt.Errorf("member %s has already been bootstrapped", m.ID)
}
if cfg.ShouldDiscover() {
str, err := discovery.JoinCluster(cfg.DiscoveryURL, cfg.DiscoveryProxy, m.ID, cfg.Cluster.String())
str, err := discovery.JoinCluster(cfg.DiscoveryURL, cfg.DiscoveryProxy, m.ID, cfg.InitialPeerURLsMap.String())
if err != nil {
return nil, err
}
if cfg.Cluster, err = NewClusterFromString(cfg.Cluster.token, str); err != nil {
urlsmap, err := types.NewURLsMap(str)
if err != nil {
return nil, err
}
if err := cfg.Cluster.Validate(); err != nil {
return nil, fmt.Errorf("bad discovery cluster: %v", err)
if checkDuplicateURL(urlsmap) {
return nil, fmt.Errorf("discovery cluster %s has duplicate url", urlsmap)
}
if cl, err = NewCluster(cfg.InitialClusterToken, urlsmap); err != nil {
return nil, err
}
}
cfg.Cluster.SetStore(st)
cl.SetStore(st)
cfg.PrintWithInitial()
id, n, s, w = startNode(cfg, cfg.Cluster.MemberIDs())
id, n, s, w = startNode(cfg, cl, cl.MemberIDs())
case haveWAL:
if err := fileutil.IsDirWriteable(cfg.DataDir); err != nil {
return nil, fmt.Errorf("cannot write to data directory: %v", err)
@ -254,16 +267,17 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
}
log.Printf("etcdserver: recovered store from snapshot at index %d", snapshot.Metadata.Index)
}
cfg.Cluster = NewClusterFromStore(cfg.Cluster.token, st)
cfg.Print()
if snapshot != nil {
log.Printf("etcdserver: loaded cluster information from store: %s", cfg.Cluster)
log.Printf("etcdserver: loaded cluster information from store: %s", cl)
}
if !cfg.ForceNewCluster {
id, n, s, w = restartNode(cfg, snapshot)
id, cl, n, s, w = restartNode(cfg, snapshot)
} else {
id, n, s, w = restartAsStandaloneNode(cfg, snapshot)
id, cl, n, s, w = restartAsStandaloneNode(cfg, snapshot)
}
cl.SetStore(st)
cl.Recover()
default:
return nil, fmt.Errorf("unsupported bootstrap config")
}
@ -288,7 +302,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
},
id: id,
attributes: Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
Cluster: cfg.Cluster,
Cluster: cl,
stats: sstats,
lstats: lstats,
SyncTicker: time.Tick(500 * time.Millisecond),
@ -297,14 +311,14 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
}
// TODO: move transport initialization near the definition of remote
tr := rafthttp.NewTransporter(cfg.Transport, id, cfg.Cluster.ID(), srv, srv.errorc, sstats, lstats)
tr := rafthttp.NewTransporter(cfg.Transport, id, cl.ID(), srv, srv.errorc, sstats, lstats)
// add all remotes into transport
for _, m := range remotes {
if m.ID != id {
tr.AddRemote(m.ID, m.PeerURLs)
}
}
for _, m := range cfg.Cluster.Members() {
for _, m := range cl.Members() {
if m.ID != id {
tr.AddPeer(m.ID, m.PeerURLs)
}

View File

@ -301,7 +301,7 @@ type cluster struct {
Members []*member
}
func fillClusterForMembers(ms []*member, cName string) error {
func fillClusterForMembers(ms []*member) error {
addrs := make([]string, 0)
for _, m := range ms {
scheme := "http"
@ -315,7 +315,7 @@ func fillClusterForMembers(ms []*member, cName string) error {
clusterStr := strings.Join(addrs, ",")
var err error
for _, m := range ms {
m.Cluster, err = etcdserver.NewClusterFromString(cName, clusterStr)
m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
if err != nil {
return err
}
@ -330,7 +330,7 @@ func newCluster(t *testing.T, size int, usePeerTLS bool) *cluster {
ms[i] = mustNewMember(t, c.name(i), usePeerTLS)
}
c.Members = ms
if err := fillClusterForMembers(c.Members, clusterName); err != nil {
if err := fillClusterForMembers(c.Members); err != nil {
t.Fatal(err)
}
@ -420,7 +420,6 @@ func (c *cluster) HTTPMembers() []client.Member {
}
func (c *cluster) addMember(t *testing.T, usePeerTLS bool) {
clusterStr := c.Members[0].Cluster.String()
m := mustNewMember(t, c.name(rand.Int()), usePeerTLS)
scheme := "http"
if usePeerTLS {
@ -441,14 +440,11 @@ func (c *cluster) addMember(t *testing.T, usePeerTLS bool) {
members := append(c.HTTPMembers(), client.Member{PeerURLs: []string{peerURL}, ClientURLs: []string{}})
c.waitMembersMatch(t, members)
for _, ln := range m.PeerListeners {
clusterStr += fmt.Sprintf(",%s=%s://%s", m.Name, scheme, ln.Addr().String())
}
var err error
m.Cluster, err = etcdserver.NewClusterFromString(clusterName, clusterStr)
if err != nil {
t.Fatal(err)
m.InitialPeerURLsMap = types.URLsMap{}
for _, mm := range c.Members {
m.InitialPeerURLsMap[mm.Name] = mm.PeerURLs
}
m.InitialPeerURLsMap[m.Name] = m.PeerURLs
m.NewCluster = false
if err := m.Launch(); err != nil {
t.Fatal(err)
@ -645,10 +641,11 @@ func mustNewMember(t *testing.T, name string, usePeerTLS bool) *member {
t.Fatal(err)
}
clusterStr := fmt.Sprintf("%s=%s://%s", name, peerScheme, pln.Addr().String())
m.Cluster, err = etcdserver.NewClusterFromString(clusterName, clusterStr)
m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
if err != nil {
t.Fatal(err)
}
m.InitialClusterToken = clusterName
m.NewCluster = true
m.Transport = mustNewTransport(t, m.PeerTLSInfo)
m.ElectionTicks = electionTicks
@ -675,12 +672,13 @@ func (m *member) Clone(t *testing.T) *member {
// this should never fail
panic(err)
}
clusterStr := m.Cluster.String()
mm.Cluster, err = etcdserver.NewClusterFromString(clusterName, clusterStr)
clusterStr := m.InitialPeerURLsMap.String()
mm.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
if err != nil {
// this should never fail
panic(err)
}
mm.InitialClusterToken = m.InitialClusterToken
mm.Transport = mustNewTransport(t, m.PeerTLSInfo)
mm.ElectionTicks = m.ElectionTicks
mm.PeerTLSInfo = m.PeerTLSInfo

71
pkg/types/urlsmap.go Normal file
View File

@ -0,0 +1,71 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package types
import (
"fmt"
"net/url"
"sort"
"strings"
)
type URLsMap map[string]URLs
// NewURLsMap returns a URLsMap instantiated from the given string,
// which consists of discovery-formatted names-to-URLs, like:
// mach0=http://1.1.1.1,mach0=http://2.2.2.2,mach1=http://3.3.3.3,mach2=http://4.4.4.4
func NewURLsMap(s string) (URLsMap, error) {
cl := URLsMap{}
v, err := url.ParseQuery(strings.Replace(s, ",", "&", -1))
if err != nil {
return nil, err
}
for name, urls := range v {
if len(urls) == 0 || urls[0] == "" {
return nil, fmt.Errorf("empty URL given for %q", name)
}
us, err := NewURLs(urls)
if err != nil {
return nil, err
}
cl[name] = us
}
return cl, nil
}
// String returns NameURLPairs into discovery-formatted name-to-URLs sorted by name.
func (c URLsMap) String() string {
pairs := make([]string, 0)
for name, urls := range c {
for _, url := range urls {
pairs = append(pairs, fmt.Sprintf("%s=%s", name, url.String()))
}
}
sort.Strings(pairs)
return strings.Join(pairs, ",")
}
// URLs returns a list of all URLs.
// The returned list is sorted in ascending lexicographical order.
func (c URLsMap) URLs() []string {
urls := make([]string, 0)
for _, us := range c {
for _, u := range us {
urls = append(urls, u.String())
}
}
sort.Strings(urls)
return urls
}

69
pkg/types/urlsmap_test.go Normal file
View File

@ -0,0 +1,69 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package types
import (
"reflect"
"testing"
"github.com/coreos/etcd/pkg/testutil"
)
func TestParseInitialCluster(t *testing.T) {
c, err := NewURLsMap("mem1=http://10.0.0.1:2379,mem1=http://128.193.4.20:2379,mem2=http://10.0.0.2:2379,default=http://127.0.0.1:2379")
if err != nil {
t.Fatalf("unexpected parse error: %v", err)
}
wc := URLsMap(map[string]URLs{
"mem1": testutil.MustNewURLs(t, []string{"http://10.0.0.1:2379", "http://128.193.4.20:2379"}),
"mem2": testutil.MustNewURLs(t, []string{"http://10.0.0.2:2379"}),
"default": testutil.MustNewURLs(t, []string{"http://127.0.0.1:2379"}),
})
if !reflect.DeepEqual(c, wc) {
t.Errorf("cluster = %+v, want %+v", c, wc)
}
}
func TestParseInitialClusterBad(t *testing.T) {
tests := []string{
// invalid URL
"%^",
// no URL defined for member
"mem1=,mem2=http://128.193.4.20:2379,mem3=http://10.0.0.2:2379",
"mem1,mem2=http://128.193.4.20:2379,mem3=http://10.0.0.2:2379",
// bad URL for member
"default=http://localhost/",
}
for i, tt := range tests {
if _, err := NewURLsMap(tt); err == nil {
t.Errorf("#%d: unexpected successful parse, want err", i)
}
}
}
func TestNameURLPairsString(t *testing.T) {
cls := URLsMap(map[string]URLs{
"abc": testutil.MustNewURLs(t, []string{"http://1.1.1.1:1111", "http://0.0.0.0:0000"}),
"def": testutil.MustNewURLs(t, []string{"http://2.2.2.2:2222"}),
"ghi": testutil.MustNewURLs(t, []string{"http://3.3.3.3:1234", "http://127.0.0.1:2380"}),
// no PeerURLs = not included
"four": testutil.MustNewURLs(t, []string{}),
"five": testutil.MustNewURLs(t, nil),
})
w := "abc=http://0.0.0.0:0000,abc=http://1.1.1.1:1111,def=http://2.2.2.2:2222,ghi=http://127.0.0.1:2380,ghi=http://3.3.3.3:1234"
if g := cls.String(); g != w {
t.Fatalf("NameURLPairs.String():\ngot %#v\nwant %#v", g, w)
}
}