diff --git a/Documentation/0.5/clustering.md b/Documentation/0.5/clustering.md index 6a8c3eb69..d088dd9ec 100644 --- a/Documentation/0.5/clustering.md +++ b/Documentation/0.5/clustering.md @@ -10,7 +10,7 @@ This guide will walk you through configuring a three machine etcd cluster with t ## Static -As we know the cluster members, their addresses and the size of the cluster before starting we can use an offline bootstrap configuration. Each machine will get either the following command line or environment variables: +As we know the cluster members, their addresses and the size of the cluster before starting, we can use an offline bootstrap configuration by setting the `initial-cluster` flag. Each machine will get either the following command line or environment variables: ``` ETCD_INITIAL_CLUSTER="infra0=http://10.0.1.10:2380,infra1=http://10.0.1.11:2380,infra2=http://10.0.1.12:2380" @@ -22,6 +22,8 @@ ETCD_INITIAL_CLUSTER_STATE=new -initial-cluster-state new ``` +Note that the URLs specified in `initial-cluster` are the _advertised peer URLs_, i.e. they should match the value of `initial-advertise-peer-urls` on the respective nodes. + If you are spinning up multiple clusters (or creating and destroying a single cluster) with same configuration for testing purpose, it is highly recommended that you specify a unique `initial-cluster-token` for the different clusters. By doing this, etcd can generate unique cluster IDs and member IDs for the clusters even if they otherwise have the exact same configuration. This can protect you from cross-cluster-interaction, which might corrupt your clusters. On each machine you would start etcd with these flags: @@ -45,11 +47,11 @@ $ etcd -name infra2 -initial-advertise-peer-urls https://10.0.1.12:2380 \ -initial-cluster-state new ``` -The command line parameters starting with `-initial-cluster` will be ignored on subsequent runs of etcd. You are free to remove the environment variables or command line flags after the initial bootstrap process. If you need to make changes to the configuration later see our guide on [runtime configuration](runtime-configuration.md). +The command line parameters starting with `-initial-cluster` will be ignored on subsequent runs of etcd. You are free to remove the environment variables or command line flags after the initial bootstrap process. If you need to make changes to the configuration later (for example, adding or removing members to/from the cluster), see the [runtime configuration](runtime-configuration.md) guide. ### Error Cases -In the following case we have not included our new host in the list of enumerated nodes. If this is a new cluster, the node must be added to the list of initial cluster members. +In the following example, we have not included our new host in the list of enumerated nodes. If this is a new cluster, the node _must_ be added to the list of initial cluster members. ``` $ etcd -name infra1 -initial-advertise-peer-urls http://10.0.1.11:2380 \ @@ -59,13 +61,13 @@ etcd: infra1 not listed in the initial cluster config exit 1 ``` -In this case we are attempting to map a node (infra0) on a different address (127.0.0.1:2380) than its enumerated address in the cluster list (10.0.1.10:2380). If this node is to listen on multiple addresses, all addresses must be reflected in the "initial-cluster" configuration directive. +In this example, we are attempting to map a node (infra0) on a different address (127.0.0.1:2380) than its enumerated address in the cluster list (10.0.1.10:2380). If this node is to listen on multiple addresses, all addresses _must_ be reflected in the "initial-cluster" configuration directive. ``` $ etcd -name infra0 -initial-advertise-peer-urls http://127.0.0.1:2380 \ -initial-cluster infra0=http://10.0.1.10:2380,infra1=http://10.0.1.11:2380,infra2=http://10.0.1.12:2380 \ -initial-cluster-state=new -etcd: infra0 has different advertised URLs in the cluster and advertised peer URLs list +etcd: error setting up initial cluster: infra0 has different advertised URLs in the cluster and advertised peer URLs list exit 1 ``` @@ -81,7 +83,7 @@ exit 1 ## Discovery -In a number of cases you might not know the IPs of your cluster peers ahead of time. This is common when utilizing cloud providers or when your network uses DHCP. In these cases you can use an existing etcd cluster to bootstrap a new one. We call this process "discovery". +In a number of cases, you might not know the IPs of your cluster peers ahead of time. This is common when utilizing cloud providers or when your network uses DHCP. In these cases, rather than specifying a static configuration, you can use an existing etcd cluster to bootstrap a new one. We call this process "discovery". ### Lifetime of a Discovery URL @@ -99,7 +101,7 @@ Discovery uses an existing cluster to bootstrap itself. If you are using your ow $ curl -X PUT https://myetcd.local/v2/keys/discovery/6c007a14875d53d9bf0ef5a6fc0257c817f0fb83/_config/size -d value=3 ``` -By setting the size key to the URL, you create a discovery URL with expected-cluster-size of 3. +By setting the size key to the URL, you create a discovery URL with an expected cluster size of 3. If you bootstrap an etcd cluster using discovery service with more than the expected number of etcd members, the extra etcd processes will [fall back][fall-back] to being [proxies][proxy] by default. @@ -124,14 +126,14 @@ This will cause each member to register itself with the custom etcd discovery se ### Public discovery service -If you do not have access to an existing cluster you can use the public discovery service hosted at discovery.etcd.io. You can create a private discovery URL using the "new" endpoint like so: +If you do not have access to an existing cluster, you can use the public discovery service hosted at `discovery.etcd.io`. You can create a private discovery URL using the "new" endpoint like so: ``` $ curl https://discovery.etcd.io/new?size=3 https://discovery.etcd.io/3e86b59982e49066c5d813af1c2e2579cbf573de ``` -This will create the cluster with an initial expected size of 3 members. If you do not specify a size a default of 3 will be used. +This will create the cluster with an initial expected size of 3 members. If you do not specify a size, a default of 3 will be used. If you bootstrap an etcd cluster using discovery service with more than the expected number of etcd members, the extra etcd processes will [fall back][fall-back] to being [proxies][proxy] by default. @@ -169,6 +171,7 @@ You can use the environment variable `ETCD_DISCOVERY_PROXY` to cause etcd to use #### Discovery Server Errors + ``` $ etcd -name infra0 -initial-advertise-peer-urls http://10.0.1.10:2380 \ -discovery https://discovery.etcd.io/3e86b59982e49066c5d813af1c2e2579cbf573de @@ -178,10 +181,13 @@ exit 1 #### User Errors +This error will occur if the discovery cluster already has the configured number of members, and `discovery-fallback` is explicitly disabled + ``` $ etcd -name infra0 -initial-advertise-peer-urls http://10.0.1.10:2380 \ - -discovery https://discovery.etcd.io/3e86b59982e49066c5d813af1c2e2579cbf573de -etcd: error: the cluster using discovery https://discovery.etcd.io/3e86b59982e49066c5d813af1c2e2579cbf573de has already started with all 5 members + -discovery https://discovery.etcd.io/3e86b59982e49066c5d813af1c2e2579cbf573de \ + -discovery-fallback exit +etcd: discovery: cluster is full exit 1 ``` @@ -193,7 +199,7 @@ ignored on this machine. ``` $ etcd -name infra0 -initial-advertise-peer-urls http://10.0.1.10:2380 \ -discovery https://discovery.etcd.io/3e86b59982e49066c5d813af1c2e2579cbf573de -etcd: warn: ignoring discovery URL: etcd has already been initialized and has a valid log in /var/lib/etcd +etcdserver: warn: ignoring discovery: etcd has already been initialized and has a valid log in /var/lib/etcd ``` # 0.4 to 0.5+ Migration Guide diff --git a/etcdmain/etcd.go b/etcdmain/etcd.go index cdbe222f2..c5c603872 100644 --- a/etcdmain/etcd.go +++ b/etcdmain/etcd.go @@ -22,6 +22,7 @@ import ( "log" "net" "net/http" + "net/url" "os" "strings" @@ -188,7 +189,11 @@ func Main() { // startEtcd launches the etcd server and HTTP handlers for client/server communication. func startEtcd() error { - cls, err := setupCluster() + apurls, err := flags.URLsFromFlags(fs, "initial-advertise-peer-urls", "addr", peerTLSInfo) + if err != nil { + return err + } + cls, err := setupCluster(apurls) if err != nil { return fmt.Errorf("error setting up initial cluster: %v", err) } @@ -222,7 +227,7 @@ func startEtcd() error { plns := make([]net.Listener, 0) for _, u := range lpurls { var l net.Listener - l, err = transport.NewListener(u.Host, peerTLSInfo) + l, err = transport.NewListener(u.Host, u.Scheme, peerTLSInfo) if err != nil { return err } @@ -246,7 +251,7 @@ func startEtcd() error { clns := make([]net.Listener, 0) for _, u := range lcurls { var l net.Listener - l, err = transport.NewListener(u.Host, clientTLSInfo) + l, err = transport.NewListener(u.Host, u.Scheme, clientTLSInfo) if err != nil { return err } @@ -265,6 +270,7 @@ func startEtcd() error { cfg := &etcdserver.ServerConfig{ Name: *name, ClientURLs: acurls, + PeerURLs: apurls, DataDir: *dir, SnapCount: *snapCount, Cluster: cls, @@ -303,7 +309,11 @@ func startEtcd() error { // startProxy launches an HTTP proxy for client communication which proxies to other etcd nodes. func startProxy() error { - cls, err := setupCluster() + apurls, err := flags.URLsFromFlags(fs, "initial-advertise-peer-urls", "addr", peerTLSInfo) + if err != nil { + return err + } + cls, err := setupCluster(apurls) if err != nil { return fmt.Errorf("error setting up initial cluster: %v", err) } @@ -349,7 +359,7 @@ func startProxy() error { } // Start a proxy server goroutine for each listen address for _, u := range lcurls { - l, err := transport.NewListener(u.Host, clientTLSInfo) + l, err := transport.NewListener(u.Host, u.Scheme, clientTLSInfo) if err != nil { return err } @@ -363,8 +373,8 @@ func startProxy() error { return nil } -// setupCluster sets up the cluster definition for bootstrap or discovery. -func setupCluster() (*etcdserver.Cluster, error) { +// setupCluster sets up an initial cluster definition for bootstrap or discovery. +func setupCluster(apurls []url.URL) (*etcdserver.Cluster, error) { set := make(map[string]bool) fs.Visit(func(f *flag.Flag) { set[f.Name] = true @@ -372,21 +382,18 @@ func setupCluster() (*etcdserver.Cluster, error) { if set["discovery"] && set["initial-cluster"] { return nil, fmt.Errorf("both discovery and bootstrap-config are set") } - apurls, err := flags.URLsFromFlags(fs, "initial-advertise-peer-urls", "addr", peerTLSInfo) - if err != nil { - return nil, err - } - var cls *etcdserver.Cluster + var err error switch { case set["discovery"]: + // If using discovery, generate a temporary cluster based on + // self's advertised peer URLs clusterStr := genClusterString(*name, apurls) cls, err = etcdserver.NewClusterFromString(*durl, clusterStr) case set["initial-cluster"]: fallthrough default: // We're statically configured, and cluster has appropriately been set. - // Try to configure by indexing the static cluster by name. cls, err = etcdserver.NewClusterFromString(*initialClusterToken, *initialCluster) } return cls, err diff --git a/etcdmain/etcd_test.go b/etcdmain/etcd_test.go index 504e3cacf..dff57f618 100644 --- a/etcdmain/etcd_test.go +++ b/etcdmain/etcd_test.go @@ -17,11 +17,20 @@ package etcdmain import ( + "net/url" "testing" "github.com/coreos/etcd/pkg/types" ) +func mustNewURLs(t *testing.T, urls []string) []url.URL { + u, err := types.NewURLs(urls) + if err != nil { + t.Fatalf("unexpected new urls error: %v", err) + } + return u +} + func TestGenClusterString(t *testing.T) { tests := []struct { token string @@ -38,10 +47,7 @@ func TestGenClusterString(t *testing.T) { }, } for i, tt := range tests { - urls, err := types.NewURLs(tt.urls) - if err != nil { - t.Fatalf("unexpected new urls error: %v", err) - } + urls := mustNewURLs(t, tt.urls) str := genClusterString(tt.token, urls) if str != tt.wstr { t.Errorf("#%d: cluster = %s, want %s", i, str, tt.wstr) diff --git a/etcdserver/cluster.go b/etcdserver/cluster.go index 461488f51..c31aca01d 100644 --- a/etcdserver/cluster.go +++ b/etcdserver/cluster.go @@ -42,11 +42,18 @@ const ( ) type ClusterInfo interface { + // ID returns the cluster ID ID() types.ID + // ClientURLs returns an aggregate set of all URLs on which this + // cluster is listening for client requests ClientURLs() []string // Members returns a slice of members sorted by their ID Members() []*Member + // Member retrieves a particular member based on ID, or nil if the + // member does not exist in the cluster Member(id types.ID) *Member + // IsIDRemoved checks whether the given ID has been removed from this + // cluster at some point in the past IsIDRemoved(id types.ID) bool } @@ -62,8 +69,9 @@ type Cluster struct { sync.Mutex } -// NewClusterFromString returns Cluster through given cluster token and parsing -// members from a sets of names to IPs discovery formatted like: +// NewClusterFromString returns a Cluster instantiated from the given cluster token +// and cluster string, by parsing members from a set of discovery-formatted +// names-to-IPs, like: // mach0=http://1.1.1.1,mach0=http://2.2.2.2,mach1=http://3.3.3.3,mach2=http://4.4.4.4 func NewClusterFromString(token string, cluster string) (*Cluster, error) { c := newCluster(token) @@ -263,12 +271,13 @@ func (c *Cluster) SetStore(st store.Store) { c.store = st } // ensures that it is still valid. func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error { members, removed := membersFromStore(c.store) - if removed[types.ID(cc.NodeID)] { + id := types.ID(cc.NodeID) + if removed[id] { return ErrIDRemoved } switch cc.Type { case raftpb.ConfChangeAddNode: - if members[types.ID(cc.NodeID)] != nil { + if members[id] != nil { return ErrIDExists } urls := make(map[string]bool) @@ -287,16 +296,39 @@ func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error { } } case raftpb.ConfChangeRemoveNode: - if members[types.ID(cc.NodeID)] == nil { + if members[id] == nil { return ErrIDNotFound } + case raftpb.ConfChangeUpdateNode: + if members[id] == nil { + return ErrIDNotFound + } + urls := make(map[string]bool) + for _, m := range members { + if m.ID == id { + continue + } + for _, u := range m.PeerURLs { + urls[u] = true + } + } + m := new(Member) + if err := json.Unmarshal(cc.Context, m); err != nil { + log.Panicf("unmarshal member should never fail: %v", err) + } + for _, u := range m.PeerURLs { + if urls[u] { + return ErrPeerURLexists + } + } default: - log.Panicf("ConfChange type should be either AddNode or RemoveNode") + log.Panicf("ConfChange type should be either AddNode, RemoveNode or UpdateNode") } return nil } -// AddMember puts a new Member into the store. +// AddMember adds a new Member into the cluster, and saves the given member's +// raftAttributes into the store. The given member should have empty attributes. // A Member with a matching id must not exist. func (c *Cluster) AddMember(m *Member) { c.Lock() @@ -309,14 +341,6 @@ func (c *Cluster) AddMember(m *Member) { if _, err := c.store.Create(p, false, string(b), false, store.Permanent); err != nil { log.Panicf("create raftAttributes should never fail: %v", err) } - b, err = json.Marshal(m.Attributes) - if err != nil { - log.Panicf("marshal attributes should never fail: %v", err) - } - p = path.Join(memberStoreKey(m.ID), attributesSuffix) - if _, err := c.store.Create(p, false, string(b), false, store.Permanent); err != nil { - log.Panicf("create attributes should never fail: %v", err) - } c.members[m.ID] = m } @@ -341,24 +365,44 @@ func (c *Cluster) UpdateMemberAttributes(id types.ID, attr Attributes) { c.members[id].Attributes = attr } +func (c *Cluster) UpdateMember(nm *Member) { + c.Lock() + defer c.Unlock() + b, err := json.Marshal(nm.RaftAttributes) + if err != nil { + log.Panicf("marshal raftAttributes should never fail: %v", err) + } + p := path.Join(memberStoreKey(nm.ID), raftAttributesSuffix) + if _, err := c.store.Update(p, string(b), store.Permanent); err != nil { + log.Panicf("update raftAttributes should never fail: %v", err) + } + c.members[nm.ID].RaftAttributes = nm.RaftAttributes +} + // nodeToMember builds member through a store node. // the child nodes of the given node should be sorted by key. func nodeToMember(n *store.NodeExtern) (*Member, error) { m := &Member{ID: mustParseMemberIDFromKey(n.Key)} - if len(n.Nodes) != 2 { - return m, fmt.Errorf("len(nodes) = %d, want 2", len(n.Nodes)) + attrs := make(map[string][]byte) + raftAttrKey := path.Join(n.Key, raftAttributesSuffix) + attrKey := path.Join(n.Key, attributesSuffix) + for _, nn := range n.Nodes { + if nn.Key != raftAttrKey && nn.Key != attrKey { + return nil, fmt.Errorf("unknown key %q", nn.Key) + } + attrs[nn.Key] = []byte(*nn.Value) } - if w := path.Join(n.Key, attributesSuffix); n.Nodes[0].Key != w { - return m, fmt.Errorf("key = %v, want %v", n.Nodes[0].Key, w) + if data := attrs[raftAttrKey]; data != nil { + if err := json.Unmarshal(data, &m.RaftAttributes); err != nil { + return nil, fmt.Errorf("unmarshal raftAttributes error: %v", err) + } + } else { + return nil, fmt.Errorf("raftAttributes key doesn't exist") } - if err := json.Unmarshal([]byte(*n.Nodes[0].Value), &m.Attributes); err != nil { - return m, fmt.Errorf("unmarshal attributes error: %v", err) - } - if w := path.Join(n.Key, raftAttributesSuffix); n.Nodes[1].Key != w { - return m, fmt.Errorf("key = %v, want %v", n.Nodes[1].Key, w) - } - if err := json.Unmarshal([]byte(*n.Nodes[1].Value), &m.RaftAttributes); err != nil { - return m, fmt.Errorf("unmarshal raftAttributes error: %v", err) + if data := attrs[attrKey]; data != nil { + if err := json.Unmarshal(data, &m.Attributes); err != nil { + return m, fmt.Errorf("unmarshal attributes error: %v", err) + } } return m, nil } diff --git a/etcdserver/cluster_test.go b/etcdserver/cluster_test.go index 9fe6ca348..77323fe33 100644 --- a/etcdserver/cluster_test.go +++ b/etcdserver/cluster_test.go @@ -31,13 +31,13 @@ import ( func TestClusterFromString(t *testing.T) { tests := []struct { f string - mems []Member + mems []*Member }{ { "mem1=http://10.0.0.1:2379,mem1=http://128.193.4.20:2379,mem2=http://10.0.0.2:2379,default=http://127.0.0.1:2379", - []Member{ - newTestMember(4322322643958477905, []string{"http://10.0.0.1:2379", "http://128.193.4.20:2379"}, "mem1", nil), + []*Member{ newTestMember(3141198903430435750, []string{"http://10.0.0.2:2379"}, "mem2", nil), + newTestMember(4322322643958477905, []string{"http://10.0.0.1:2379", "http://128.193.4.20:2379"}, "mem1", nil), newTestMember(12762790032478827328, []string{"http://127.0.0.1:2379"}, "default", nil), }, }, @@ -50,9 +50,8 @@ func TestClusterFromString(t *testing.T) { if c.token != "abc" { t.Errorf("#%d: token = %v, want abc", i, c.token) } - wc := newTestCluster(tt.mems) - if !reflect.DeepEqual(c.members, wc.members) { - t.Errorf("#%d: members = %+v, want %+v", i, c.members, wc.members) + if !reflect.DeepEqual(c.Members(), tt.mems) { + t.Errorf("#%d: members = %+v, want %+v", i, c.Members(), tt.mems) } } } @@ -80,39 +79,39 @@ func TestClusterFromStringBad(t *testing.T) { func TestClusterFromStore(t *testing.T) { tests := []struct { - mems []Member + mems []*Member }{ { - []Member{newTestMember(1, nil, "node1", nil)}, + []*Member{newTestMember(1, nil, "", nil)}, }, { - []Member{}, + nil, }, { - []Member{ - newTestMember(1, nil, "node1", nil), - newTestMember(2, nil, "node2", nil), + []*Member{ + newTestMember(1, nil, "", nil), + newTestMember(2, nil, "", nil), }, }, } for i, tt := range tests { hc := newTestCluster(nil) + hc.SetStore(store.New()) for _, m := range tt.mems { - hc.AddMember(&m) + hc.AddMember(m) } c := NewClusterFromStore("abc", hc.store) if c.token != "abc" { t.Errorf("#%d: token = %v, want %v", i, c.token, "abc") } - wc := newTestCluster(tt.mems) - if !reflect.DeepEqual(c.members, wc.members) { - t.Errorf("#%d: members = %v, want %v", i, c.members, wc.members) + if !reflect.DeepEqual(c.Members(), tt.mems) { + t.Errorf("#%d: members = %v, want %v", i, c.Members(), tt.mems) } } } func TestClusterMember(t *testing.T) { - membs := []Member{ + membs := []*Member{ newTestMember(1, nil, "node1", nil), newTestMember(2, nil, "node2", nil), } @@ -137,7 +136,7 @@ func TestClusterMember(t *testing.T) { } func TestClusterMemberByName(t *testing.T) { - membs := []Member{ + membs := []*Member{ newTestMember(1, nil, "node1", nil), newTestMember(2, nil, "node2", nil), } @@ -162,7 +161,7 @@ func TestClusterMemberByName(t *testing.T) { } func TestClusterMemberIDs(t *testing.T) { - c := newTestCluster([]Member{ + c := newTestCluster([]*Member{ newTestMember(1, nil, "", nil), newTestMember(4, nil, "", nil), newTestMember(100, nil, "", nil), @@ -176,12 +175,12 @@ func TestClusterMemberIDs(t *testing.T) { func TestClusterPeerURLs(t *testing.T) { tests := []struct { - mems []Member + mems []*Member wurls []string }{ // single peer with a single address { - mems: []Member{ + mems: []*Member{ newTestMember(1, []string{"http://192.0.2.1"}, "", nil), }, wurls: []string{"http://192.0.2.1"}, @@ -189,7 +188,7 @@ func TestClusterPeerURLs(t *testing.T) { // single peer with a single address with a port { - mems: []Member{ + mems: []*Member{ newTestMember(1, []string{"http://192.0.2.1:8001"}, "", nil), }, wurls: []string{"http://192.0.2.1:8001"}, @@ -197,7 +196,7 @@ func TestClusterPeerURLs(t *testing.T) { // several members explicitly unsorted { - mems: []Member{ + mems: []*Member{ newTestMember(2, []string{"http://192.0.2.3", "http://192.0.2.4"}, "", nil), newTestMember(3, []string{"http://192.0.2.5", "http://192.0.2.6"}, "", nil), newTestMember(1, []string{"http://192.0.2.1", "http://192.0.2.2"}, "", nil), @@ -207,13 +206,13 @@ func TestClusterPeerURLs(t *testing.T) { // no members { - mems: []Member{}, + mems: []*Member{}, wurls: []string{}, }, // peer with no peer urls { - mems: []Member{ + mems: []*Member{ newTestMember(3, []string{}, "", nil), }, wurls: []string{}, @@ -231,12 +230,12 @@ func TestClusterPeerURLs(t *testing.T) { func TestClusterClientURLs(t *testing.T) { tests := []struct { - mems []Member + mems []*Member wurls []string }{ // single peer with a single address { - mems: []Member{ + mems: []*Member{ newTestMember(1, nil, "", []string{"http://192.0.2.1"}), }, wurls: []string{"http://192.0.2.1"}, @@ -244,7 +243,7 @@ func TestClusterClientURLs(t *testing.T) { // single peer with a single address with a port { - mems: []Member{ + mems: []*Member{ newTestMember(1, nil, "", []string{"http://192.0.2.1:8001"}), }, wurls: []string{"http://192.0.2.1:8001"}, @@ -252,7 +251,7 @@ func TestClusterClientURLs(t *testing.T) { // several members explicitly unsorted { - mems: []Member{ + mems: []*Member{ newTestMember(2, nil, "", []string{"http://192.0.2.3", "http://192.0.2.4"}), newTestMember(3, nil, "", []string{"http://192.0.2.5", "http://192.0.2.6"}), newTestMember(1, nil, "", []string{"http://192.0.2.1", "http://192.0.2.2"}), @@ -262,13 +261,13 @@ func TestClusterClientURLs(t *testing.T) { // no members { - mems: []Member{}, + mems: []*Member{}, wurls: []string{}, }, // peer with no client urls { - mems: []Member{ + mems: []*Member{ newTestMember(3, nil, "", []string{}), }, wurls: []string{}, @@ -286,34 +285,34 @@ func TestClusterClientURLs(t *testing.T) { func TestClusterValidateAndAssignIDsBad(t *testing.T) { tests := []struct { - clmembs []Member + clmembs []*Member membs []*Member }{ { // unmatched length - []Member{ + []*Member{ newTestMember(1, []string{"http://127.0.0.1:2379"}, "", nil), }, []*Member{}, }, { // unmatched peer urls - []Member{ + []*Member{ newTestMember(1, []string{"http://127.0.0.1:2379"}, "", nil), }, []*Member{ - newTestMemberp(1, []string{"http://127.0.0.1:4001"}, "", nil), + newTestMember(1, []string{"http://127.0.0.1:4001"}, "", nil), }, }, { // unmatched peer urls - []Member{ + []*Member{ newTestMember(1, []string{"http://127.0.0.1:2379"}, "", nil), newTestMember(2, []string{"http://127.0.0.2:2379"}, "", nil), }, []*Member{ - newTestMemberp(1, []string{"http://127.0.0.1:2379"}, "", nil), - newTestMemberp(2, []string{"http://127.0.0.2:4001"}, "", nil), + newTestMember(1, []string{"http://127.0.0.1:2379"}, "", nil), + newTestMember(2, []string{"http://127.0.0.2:4001"}, "", nil), }, }, } @@ -327,18 +326,18 @@ func TestClusterValidateAndAssignIDsBad(t *testing.T) { func TestClusterValidateAndAssignIDs(t *testing.T) { tests := []struct { - clmembs []Member + clmembs []*Member membs []*Member wids []types.ID }{ { - []Member{ + []*Member{ newTestMember(1, []string{"http://127.0.0.1:2379"}, "", nil), newTestMember(2, []string{"http://127.0.0.2:2379"}, "", nil), }, []*Member{ - newTestMemberp(3, []string{"http://127.0.0.1:2379"}, "", nil), - newTestMemberp(4, []string{"http://127.0.0.2:2379"}, "", nil), + newTestMember(3, []string{"http://127.0.0.1:2379"}, "", nil), + newTestMember(4, []string{"http://127.0.0.2:2379"}, "", nil), }, []types.ID{3, 4}, }, @@ -364,7 +363,25 @@ func TestClusterValidateConfigurationChange(t *testing.T) { cl.RemoveMember(4) attr := RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 1)}} - cxt, err := json.Marshal(&Member{ID: types.ID(5), RaftAttributes: attr}) + ctx, err := json.Marshal(&Member{ID: types.ID(5), RaftAttributes: attr}) + if err != nil { + t.Fatal(err) + } + + attr = RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 5)}} + ctx5, err := json.Marshal(&Member{ID: types.ID(5), RaftAttributes: attr}) + if err != nil { + t.Fatal(err) + } + + attr = RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 3)}} + ctx2to3, err := json.Marshal(&Member{ID: types.ID(2), RaftAttributes: attr}) + if err != nil { + t.Fatal(err) + } + + attr = RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 5)}} + ctx2to5, err := json.Marshal(&Member{ID: types.ID(2), RaftAttributes: attr}) if err != nil { t.Fatal(err) } @@ -405,7 +422,7 @@ func TestClusterValidateConfigurationChange(t *testing.T) { raftpb.ConfChange{ Type: raftpb.ConfChangeAddNode, NodeID: 5, - Context: cxt, + Context: ctx, }, ErrPeerURLexists, }, @@ -416,6 +433,39 @@ func TestClusterValidateConfigurationChange(t *testing.T) { }, ErrIDNotFound, }, + { + raftpb.ConfChange{ + Type: raftpb.ConfChangeAddNode, + NodeID: 5, + Context: ctx5, + }, + nil, + }, + { + raftpb.ConfChange{ + Type: raftpb.ConfChangeUpdateNode, + NodeID: 5, + Context: ctx, + }, + ErrIDNotFound, + }, + // try to change the peer url of 2 to the peer url of 3 + { + raftpb.ConfChange{ + Type: raftpb.ConfChangeUpdateNode, + NodeID: 2, + Context: ctx2to3, + }, + ErrPeerURLexists, + }, + { + raftpb.ConfChange{ + Type: raftpb.ConfChangeUpdateNode, + NodeID: 2, + Context: ctx2to5, + }, + nil, + }, } for i, tt := range tests { err := cl.ValidateConfigurationChange(tt.cc) @@ -426,7 +476,7 @@ func TestClusterValidateConfigurationChange(t *testing.T) { } func TestClusterGenID(t *testing.T) { - cs := newTestCluster([]Member{ + cs := newTestCluster([]*Member{ newTestMember(1, nil, "", nil), newTestMember(2, nil, "", nil), }) @@ -438,7 +488,7 @@ func TestClusterGenID(t *testing.T) { previd := cs.ID() cs.SetStore(&storeRecorder{}) - cs.AddMember(newTestMemberp(3, nil, "", nil)) + cs.AddMember(newTestMember(3, nil, "", nil)) cs.genID() if cs.ID() == previd { t.Fatalf("cluster.ID = %v, want not %v", cs.ID(), previd) @@ -451,22 +501,22 @@ func TestNodeToMemberBad(t *testing.T) { {Key: "/1234/strange"}, }}, {Key: "/1234", Nodes: []*store.NodeExtern{ - {Key: "/1234/dynamic", Value: stringp("garbage")}, + {Key: "/1234/raftAttributes", Value: stringp("garbage")}, }}, {Key: "/1234", Nodes: []*store.NodeExtern{ - {Key: "/1234/dynamic", Value: stringp(`{"peerURLs":null}`)}, + {Key: "/1234/attributes", Value: stringp(`{"name":"node1","clientURLs":null}`)}, }}, {Key: "/1234", Nodes: []*store.NodeExtern{ - {Key: "/1234/dynamic", Value: stringp(`{"peerURLs":null}`)}, + {Key: "/1234/raftAttributes", Value: stringp(`{"peerURLs":null}`)}, {Key: "/1234/strange"}, }}, {Key: "/1234", Nodes: []*store.NodeExtern{ - {Key: "/1234/dynamic", Value: stringp(`{"peerURLs":null}`)}, - {Key: "/1234/static", Value: stringp("garbage")}, + {Key: "/1234/raftAttributes", Value: stringp(`{"peerURLs":null}`)}, + {Key: "/1234/attributes", Value: stringp("garbage")}, }}, {Key: "/1234", Nodes: []*store.NodeExtern{ - {Key: "/1234/dynamic", Value: stringp(`{"peerURLs":null}`)}, - {Key: "/1234/static", Value: stringp(`{"name":"node1","clientURLs":null}`)}, + {Key: "/1234/raftAttributes", Value: stringp(`{"peerURLs":null}`)}, + {Key: "/1234/attributes", Value: stringp(`{"name":"node1","clientURLs":null}`)}, {Key: "/1234/strange"}, }}, } @@ -481,7 +531,7 @@ func TestClusterAddMember(t *testing.T) { st := &storeRecorder{} c := newTestCluster(nil) c.SetStore(st) - c.AddMember(newTestMemberp(1, nil, "node1", nil)) + c.AddMember(newTestMember(1, nil, "node1", nil)) wactions := []action{ { @@ -494,16 +544,6 @@ func TestClusterAddMember(t *testing.T) { store.Permanent, }, }, - { - name: "Create", - params: []interface{}{ - path.Join(storeMembersPrefix, "1", "attributes"), - false, - `{"name":"node1"}`, - false, - store.Permanent, - }, - }, } if g := st.Action(); !reflect.DeepEqual(g, wactions) { t.Errorf("actions = %v, want %v", g, wactions) @@ -535,32 +575,32 @@ func TestClusterMembers(t *testing.T) { func TestClusterString(t *testing.T) { cls := &Cluster{ members: map[types.ID]*Member{ - 1: newTestMemberp( + 1: newTestMember( 1, []string{"http://1.1.1.1:1111", "http://0.0.0.0:0000"}, "abc", nil, ), - 2: newTestMemberp( + 2: newTestMember( 2, []string{"http://2.2.2.2:2222"}, "def", nil, ), - 3: newTestMemberp( + 3: newTestMember( 3, []string{"http://3.3.3.3:1234", "http://127.0.0.1:7001"}, "ghi", nil, ), // no PeerURLs = not included - 4: newTestMemberp( + 4: newTestMember( 4, []string{}, "four", nil, ), - 5: newTestMemberp( + 5: newTestMember( 5, nil, "five", @@ -605,24 +645,10 @@ func TestNodeToMember(t *testing.T) { } } -func newTestCluster(membs []Member) *Cluster { +func newTestCluster(membs []*Member) *Cluster { c := &Cluster{members: make(map[types.ID]*Member), removed: make(map[types.ID]bool)} - c.store = store.New() - for i := range membs { - c.AddMember(&membs[i]) + for _, m := range membs { + c.members[m.ID] = m } return c } - -func newTestMember(id uint64, peerURLs []string, name string, clientURLs []string) Member { - return Member{ - ID: types.ID(id), - RaftAttributes: RaftAttributes{PeerURLs: peerURLs}, - Attributes: Attributes{Name: name, ClientURLs: clientURLs}, - } -} - -func newTestMemberp(id uint64, peerURLs []string, name string, clientURLs []string) *Member { - m := newTestMember(id, peerURLs, name, clientURLs) - return &m -} diff --git a/etcdserver/config.go b/etcdserver/config.go index 35b371048..6a434b0d3 100644 --- a/etcdserver/config.go +++ b/etcdserver/config.go @@ -20,6 +20,8 @@ import ( "fmt" "net/http" "path" + "reflect" + "sort" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft" @@ -31,6 +33,7 @@ type ServerConfig struct { DiscoveryURL string DiscoveryProxy string ClientURLs types.URLs + PeerURLs types.URLs DataDir string SnapCount uint64 Cluster *Cluster @@ -45,7 +48,7 @@ func (c *ServerConfig) VerifyBootstrapConfig() error { m := c.Cluster.MemberByName(c.Name) // Make sure the cluster at least contains the local server. if m == nil { - return fmt.Errorf("couldn't find local name %s in the initial cluster configuration", c.Name) + return fmt.Errorf("couldn't find local name %q in the initial cluster configuration", c.Name) } if uint64(m.ID) == raft.None { return fmt.Errorf("cannot use %x as member id", raft.None) @@ -65,6 +68,13 @@ func (c *ServerConfig) VerifyBootstrapConfig() error { urlMap[url] = true } } + + // Advertised peer URLs must match those in the cluster peer list + apurls := c.PeerURLs.StringSlice() + sort.Strings(apurls) + if !reflect.DeepEqual(apurls, m.PeerURLs) { + return fmt.Errorf("%s has different advertised URLs in the cluster and advertised peer URLs list", c.Name) + } return nil } diff --git a/etcdserver/config_test.go b/etcdserver/config_test.go index d5bda587d..bbfb59882 100644 --- a/etcdserver/config_test.go +++ b/etcdserver/config_test.go @@ -16,12 +16,26 @@ package etcdserver -import "testing" +import ( + "net/url" + "testing" + + "github.com/coreos/etcd/pkg/types" +) + +func mustNewURLs(t *testing.T, urls []string) []url.URL { + u, err := types.NewURLs(urls) + if err != nil { + t.Fatalf("error creating new URLs from %q: %v", urls, err) + } + return u +} func TestBootstrapConfigVerify(t *testing.T) { tests := []struct { clusterSetting string newclst bool + apurls []string disc string shouldError bool }{ @@ -29,35 +43,63 @@ func TestBootstrapConfigVerify(t *testing.T) { // Node must exist in cluster "", true, + nil, "", + true, }, { // Cannot have duplicate URLs in cluster config "node1=http://localhost:7001,node2=http://localhost:7001,node2=http://localhost:7002", true, + nil, "", + true, }, { // Node defined, ClusterState OK "node1=http://localhost:7001,node2=http://localhost:7002", true, + []string{"http://localhost:7001"}, "", + false, }, { // Node defined, discovery OK "node1=http://localhost:7001", false, + []string{"http://localhost:7001"}, "http://discovery", + false, }, { // Cannot have ClusterState!=new && !discovery "node1=http://localhost:7001", false, + nil, "", + + true, + }, + { + // Advertised peer URLs must match those in cluster-state + "node1=http://localhost:7001", + true, + []string{"http://localhost:12345"}, + "", + + true, + }, + { + // Advertised peer URLs must match those in cluster-state + "node1=http://localhost:7001,node1=http://localhost:12345", + true, + []string{"http://localhost:12345"}, + "", + true, }, } @@ -67,13 +109,15 @@ func TestBootstrapConfigVerify(t *testing.T) { if err != nil { t.Fatalf("#%d: Got unexpected error: %v", i, err) } - cfg := ServerConfig{ Name: "node1", DiscoveryURL: tt.disc, Cluster: cluster, NewCluster: tt.newclst, } + if tt.apurls != nil { + cfg.PeerURLs = mustNewURLs(t, tt.apurls) + } err = cfg.VerifyBootstrapConfig() if (err == nil) && tt.shouldError { t.Errorf("%#v", *cluster) diff --git a/etcdserver/etcdhttp/client.go b/etcdserver/etcdhttp/client.go index 576c61431..e9e47ed7a 100644 --- a/etcdserver/etcdhttp/client.go +++ b/etcdserver/etcdhttp/client.go @@ -148,7 +148,7 @@ type membersHandler struct { } func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - if !allowMethod(w, r.Method, "GET", "POST", "DELETE") { + if !allowMethod(w, r.Method, "GET", "POST", "DELETE", "PUT") { return } w.Header().Set("X-Etcd-Cluster-ID", h.clusterInfo.ID().String()) @@ -168,25 +168,13 @@ func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { log.Printf("etcdhttp: %v", err) } case "POST": - ctype := r.Header.Get("Content-Type") - if ctype != "application/json" { - writeError(w, httptypes.NewHTTPError(http.StatusUnsupportedMediaType, fmt.Sprintf("Bad Content-Type %s, accept application/json", ctype))) - return - } - b, err := ioutil.ReadAll(r.Body) - if err != nil { - writeError(w, httptypes.NewHTTPError(http.StatusBadRequest, err.Error())) - return - } req := httptypes.MemberCreateRequest{} - if err := json.Unmarshal(b, &req); err != nil { - writeError(w, httptypes.NewHTTPError(http.StatusBadRequest, err.Error())) + if ok := unmarshalRequest(r, &req, w); !ok { return } - now := h.clock.Now() m := etcdserver.NewMember("", req.PeerURLs, "", &now) - err = h.server.AddMember(ctx, *m) + err := h.server.AddMember(ctx, *m) switch { case err == etcdserver.ErrIDExists || err == etcdserver.ErrPeerURLexists: writeError(w, httptypes.NewHTTPError(http.StatusConflict, err.Error())) @@ -203,28 +191,47 @@ func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { log.Printf("etcdhttp: %v", err) } case "DELETE": - idStr := trimPrefix(r.URL.Path, membersPrefix) - if idStr == "" { - http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) + id, ok := getID(r.URL.Path, w) + if !ok { return } - id, err := types.IDFromString(idStr) - if err != nil { - writeError(w, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", idStr))) - return - } - err = h.server.RemoveMember(ctx, uint64(id)) + err := h.server.RemoveMember(ctx, uint64(id)) switch { case err == etcdserver.ErrIDRemoved: - writeError(w, httptypes.NewHTTPError(http.StatusGone, fmt.Sprintf("Member permanently removed: %s", idStr))) + writeError(w, httptypes.NewHTTPError(http.StatusGone, fmt.Sprintf("Member permanently removed: %s", id))) case err == etcdserver.ErrIDNotFound: - writeError(w, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", idStr))) + writeError(w, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", id))) case err != nil: log.Printf("etcdhttp: error removing node %s: %v", id, err) writeError(w, err) default: w.WriteHeader(http.StatusNoContent) } + case "PUT": + id, ok := getID(r.URL.Path, w) + if !ok { + return + } + req := httptypes.MemberUpdateRequest{} + if ok := unmarshalRequest(r, &req, w); !ok { + return + } + m := etcdserver.Member{ + ID: id, + RaftAttributes: etcdserver.RaftAttributes{PeerURLs: req.PeerURLs.StringSlice()}, + } + err := h.server.UpdateMember(ctx, m) + switch { + case err == etcdserver.ErrPeerURLexists: + writeError(w, httptypes.NewHTTPError(http.StatusConflict, err.Error())) + case err == etcdserver.ErrIDNotFound: + writeError(w, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", id))) + case err != nil: + log.Printf("etcdhttp: error updating node %s: %v", m.ID, err) + writeError(w, err) + default: + w.WriteHeader(http.StatusNoContent) + } } } @@ -506,6 +513,38 @@ func trimErrorPrefix(err error, prefix string) error { return err } +func unmarshalRequest(r *http.Request, req json.Unmarshaler, w http.ResponseWriter) bool { + ctype := r.Header.Get("Content-Type") + if ctype != "application/json" { + writeError(w, httptypes.NewHTTPError(http.StatusUnsupportedMediaType, fmt.Sprintf("Bad Content-Type %s, accept application/json", ctype))) + return false + } + b, err := ioutil.ReadAll(r.Body) + if err != nil { + writeError(w, httptypes.NewHTTPError(http.StatusBadRequest, err.Error())) + return false + } + if err := req.UnmarshalJSON(b); err != nil { + writeError(w, httptypes.NewHTTPError(http.StatusBadRequest, err.Error())) + return false + } + return true +} + +func getID(p string, w http.ResponseWriter) (types.ID, bool) { + idStr := trimPrefix(p, membersPrefix) + if idStr == "" { + http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) + return 0, false + } + id, err := types.IDFromString(idStr) + if err != nil { + writeError(w, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", idStr))) + return 0, false + } + return id, true +} + // getUint64 extracts a uint64 by the given key from a Form. If the key does // not exist in the form, 0 is returned. If the key exists but the value is // badly formed, an error is returned. If multiple values are present only the diff --git a/etcdserver/etcdhttp/client_test.go b/etcdserver/etcdhttp/client_test.go index 78e0a4e0a..bf4f21e6c 100644 --- a/etcdserver/etcdhttp/client_test.go +++ b/etcdserver/etcdhttp/client_test.go @@ -111,6 +111,11 @@ func (s *serverRecorder) RemoveMember(_ context.Context, id uint64) error { return nil } +func (s *serverRecorder) UpdateMember(_ context.Context, m etcdserver.Member) error { + s.actions = append(s.actions, action{name: "UpdateMember", params: []interface{}{m}}) + return nil +} + type action struct { name string params []interface{} @@ -136,11 +141,12 @@ type resServer struct { func (rs *resServer) Do(_ context.Context, _ etcdserverpb.Request) (etcdserver.Response, error) { return rs.res, nil } -func (rs *resServer) Process(_ context.Context, _ raftpb.Message) error { return nil } -func (rs *resServer) Start() {} -func (rs *resServer) Stop() {} -func (rs *resServer) AddMember(_ context.Context, _ etcdserver.Member) error { return nil } -func (rs *resServer) RemoveMember(_ context.Context, _ uint64) error { return nil } +func (rs *resServer) Process(_ context.Context, _ raftpb.Message) error { return nil } +func (rs *resServer) Start() {} +func (rs *resServer) Stop() {} +func (rs *resServer) AddMember(_ context.Context, _ etcdserver.Member) error { return nil } +func (rs *resServer) RemoveMember(_ context.Context, _ uint64) error { return nil } +func (rs *resServer) UpdateMember(_ context.Context, _ etcdserver.Member) error { return nil } func boolp(b bool) *bool { return &b } @@ -698,6 +704,48 @@ func TestServeMembersDelete(t *testing.T) { } } +func TestServeMembersUpdate(t *testing.T) { + u := mustNewURL(t, path.Join(membersPrefix, "1")) + b := []byte(`{"peerURLs":["http://127.0.0.1:1"]}`) + req, err := http.NewRequest("PUT", u.String(), bytes.NewReader(b)) + if err != nil { + t.Fatal(err) + } + req.Header.Set("Content-Type", "application/json") + s := &serverRecorder{} + h := &membersHandler{ + server: s, + clock: clockwork.NewFakeClock(), + clusterInfo: &fakeCluster{id: 1}, + } + rw := httptest.NewRecorder() + + h.ServeHTTP(rw, req) + + wcode := http.StatusNoContent + if rw.Code != wcode { + t.Errorf("code=%d, want %d", rw.Code, wcode) + } + + gcid := rw.Header().Get("X-Etcd-Cluster-ID") + wcid := h.clusterInfo.ID().String() + if gcid != wcid { + t.Errorf("cid = %s, want %s", gcid, wcid) + } + + wm := etcdserver.Member{ + ID: 1, + RaftAttributes: etcdserver.RaftAttributes{ + PeerURLs: []string{"http://127.0.0.1:1"}, + }, + } + + wactions := []action{{name: "UpdateMember", params: []interface{}{wm}}} + if !reflect.DeepEqual(s.actions, wactions) { + t.Errorf("actions = %+v, want %+v", s.actions, wactions) + } +} + func TestServeMembersFail(t *testing.T) { tests := []struct { req *http.Request @@ -855,6 +903,104 @@ func TestServeMembersFail(t *testing.T) { }, nil, + http.StatusMethodNotAllowed, + }, + { + // parse body error + &http.Request{ + URL: mustNewURL(t, path.Join(membersPrefix, "0")), + Method: "PUT", + Body: ioutil.NopCloser(strings.NewReader("bad json")), + Header: map[string][]string{"Content-Type": []string{"application/json"}}, + }, + &resServer{}, + + http.StatusBadRequest, + }, + { + // bad content type + &http.Request{ + URL: mustNewURL(t, path.Join(membersPrefix, "0")), + Method: "PUT", + Body: ioutil.NopCloser(strings.NewReader(`{"PeerURLs": ["http://127.0.0.1:1"]}`)), + Header: map[string][]string{"Content-Type": []string{"application/bad"}}, + }, + &errServer{}, + + http.StatusUnsupportedMediaType, + }, + { + // bad url + &http.Request{ + URL: mustNewURL(t, path.Join(membersPrefix, "0")), + Method: "PUT", + Body: ioutil.NopCloser(strings.NewReader(`{"PeerURLs": ["http://a"]}`)), + Header: map[string][]string{"Content-Type": []string{"application/json"}}, + }, + &errServer{}, + + http.StatusBadRequest, + }, + { + // etcdserver.UpdateMember error + &http.Request{ + URL: mustNewURL(t, path.Join(membersPrefix, "0")), + Method: "PUT", + Body: ioutil.NopCloser(strings.NewReader(`{"PeerURLs": ["http://127.0.0.1:1"]}`)), + Header: map[string][]string{"Content-Type": []string{"application/json"}}, + }, + &errServer{ + errors.New("blah"), + }, + + http.StatusInternalServerError, + }, + { + // etcdserver.UpdateMember error + &http.Request{ + URL: mustNewURL(t, path.Join(membersPrefix, "0")), + Method: "PUT", + Body: ioutil.NopCloser(strings.NewReader(`{"PeerURLs": ["http://127.0.0.1:1"]}`)), + Header: map[string][]string{"Content-Type": []string{"application/json"}}, + }, + &errServer{ + etcdserver.ErrPeerURLexists, + }, + + http.StatusConflict, + }, + { + // etcdserver.UpdateMember error + &http.Request{ + URL: mustNewURL(t, path.Join(membersPrefix, "0")), + Method: "PUT", + Body: ioutil.NopCloser(strings.NewReader(`{"PeerURLs": ["http://127.0.0.1:1"]}`)), + Header: map[string][]string{"Content-Type": []string{"application/json"}}, + }, + &errServer{ + etcdserver.ErrIDNotFound, + }, + + http.StatusNotFound, + }, + { + // etcdserver.UpdateMember error with badly formed ID + &http.Request{ + URL: mustNewURL(t, path.Join(membersPrefix, "bad_id")), + Method: "PUT", + }, + nil, + + http.StatusNotFound, + }, + { + // etcdserver.UpdateMember with no ID + &http.Request{ + URL: mustNewURL(t, membersPrefix), + Method: "PUT", + }, + nil, + http.StatusMethodNotAllowed, }, } @@ -995,6 +1141,43 @@ func TestServeMachines(t *testing.T) { } } +func TestGetID(t *testing.T) { + tests := []struct { + path string + + wok bool + wid types.ID + wcode int + }{ + { + "123", + true, 0x123, http.StatusOK, + }, + { + "bad_id", + false, 0, http.StatusNotFound, + }, + { + "", + false, 0, http.StatusMethodNotAllowed, + }, + } + + for i, tt := range tests { + w := httptest.NewRecorder() + id, ok := getID(tt.path, w) + if id != tt.wid { + t.Errorf("#%d: id = %d, want %d", i, id, tt.wid) + } + if ok != tt.wok { + t.Errorf("#%d: ok = %t, want %t", i, ok, tt.wok) + } + if w.Code != tt.wcode { + t.Errorf("#%d code = %d, want %d", i, w.Code, tt.wcode) + } + } +} + type dummyStats struct { data []byte } diff --git a/etcdserver/etcdhttp/http_test.go b/etcdserver/etcdhttp/http_test.go index 476a10725..9813ed47d 100644 --- a/etcdserver/etcdhttp/http_test.go +++ b/etcdserver/etcdhttp/http_test.go @@ -79,6 +79,9 @@ func (fs *errServer) AddMember(ctx context.Context, m etcdserver.Member) error { func (fs *errServer) RemoveMember(ctx context.Context, id uint64) error { return fs.err } +func (fs *errServer) UpdateMember(ctx context.Context, m etcdserver.Member) error { + return fs.err +} func TestWriteError(t *testing.T) { // nil error should not panic diff --git a/etcdserver/etcdhttp/httptypes/member.go b/etcdserver/etcdhttp/httptypes/member.go index 6d8554ed0..a71457bcb 100644 --- a/etcdserver/etcdhttp/httptypes/member.go +++ b/etcdserver/etcdhttp/httptypes/member.go @@ -33,6 +33,10 @@ type MemberCreateRequest struct { PeerURLs types.URLs } +type MemberUpdateRequest struct { + MemberCreateRequest +} + func (m *MemberCreateRequest) MarshalJSON() ([]byte, error) { s := struct { PeerURLs []string `json:"peerURLs"` diff --git a/etcdserver/member_test.go b/etcdserver/member_test.go index 1ad254d68..5ec26003e 100644 --- a/etcdserver/member_test.go +++ b/etcdserver/member_test.go @@ -63,7 +63,7 @@ func TestMemberPick(t *testing.T) { urls map[string]bool }{ { - newTestMemberp(1, []string{"abc", "def", "ghi", "jkl", "mno", "pqr", "stu"}, "", nil), + newTestMember(1, []string{"abc", "def", "ghi", "jkl", "mno", "pqr", "stu"}, "", nil), map[string]bool{ "abc": true, "def": true, @@ -75,7 +75,7 @@ func TestMemberPick(t *testing.T) { }, }, { - newTestMemberp(2, []string{"xyz"}, "", nil), + newTestMember(2, []string{"xyz"}, "", nil), map[string]bool{"xyz": true}, }, } @@ -92,10 +92,10 @@ func TestMemberPick(t *testing.T) { func TestMemberClone(t *testing.T) { tests := []*Member{ - newTestMemberp(1, nil, "abc", nil), - newTestMemberp(1, []string{"http://a"}, "abc", nil), - newTestMemberp(1, nil, "abc", []string{"http://b"}), - newTestMemberp(1, []string{"http://a"}, "abc", []string{"http://b"}), + newTestMember(1, nil, "abc", nil), + newTestMember(1, []string{"http://a"}, "abc", nil), + newTestMember(1, nil, "abc", []string{"http://b"}), + newTestMember(1, []string{"http://a"}, "abc", []string{"http://b"}), } for i, tt := range tests { nm := tt.Clone() @@ -107,3 +107,11 @@ func TestMemberClone(t *testing.T) { } } } + +func newTestMember(id uint64, peerURLs []string, name string, clientURLs []string) *Member { + return &Member{ + ID: types.ID(id), + RaftAttributes: RaftAttributes{PeerURLs: peerURLs}, + Attributes: Attributes{Name: name, ClientURLs: clientURLs}, + } +} diff --git a/etcdserver/sender.go b/etcdserver/sender.go index dbe846f76..b2512057c 100644 --- a/etcdserver/sender.go +++ b/etcdserver/sender.go @@ -21,6 +21,9 @@ import ( "fmt" "log" "net/http" + "net/url" + "path" + "sync" "time" "github.com/coreos/etcd/etcdserver/stats" @@ -108,12 +111,30 @@ func (h *sendHub) Remove(id types.ID) { delete(h.senders, id) } +func (h *sendHub) Update(m *Member) { + // TODO: return error or just panic? + if _, ok := h.senders[m.ID]; !ok { + return + } + peerURL := m.PickPeerURL() + u, err := url.Parse(peerURL) + if err != nil { + log.Panicf("unexpect peer url %s", peerURL) + } + u.Path = path.Join(u.Path, raftPrefix) + s := h.senders[m.ID] + s.mu.Lock() + defer s.mu.Unlock() + s.u = u.String() +} + type sender struct { u string cid types.ID c *http.Client fs *stats.FollowerStats q chan []byte + mu sync.RWMutex } func newSender(u string, cid types.ID, c *http.Client, fs *stats.FollowerStats) *sender { @@ -159,7 +180,9 @@ func (s *sender) handle() { // post POSTs a data payload to a url. Returns nil if the POST succeeds, // error on any failure. func (s *sender) post(data []byte) error { + s.mu.RLock() req, err := http.NewRequest("POST", s.u, bytes.NewBuffer(data)) + s.mu.RUnlock() if err != nil { return fmt.Errorf("new request to %s error: %v", s.u, err) } diff --git a/etcdserver/sender_test.go b/etcdserver/sender_test.go index a8b22ae45..25b928b7c 100644 --- a/etcdserver/sender_test.go +++ b/etcdserver/sender_test.go @@ -24,7 +24,7 @@ import ( ) func TestSendHubInitSenders(t *testing.T) { - membs := []Member{ + membs := []*Member{ newTestMember(1, []string{"http://a"}, "", nil), newTestMember(2, []string{"http://b"}, "", nil), newTestMember(3, []string{"http://c"}, "", nil), @@ -48,7 +48,7 @@ func TestSendHubAdd(t *testing.T) { cl := newTestCluster(nil) ls := stats.NewLeaderStats("") h := newSendHub(nil, cl, nil, ls) - m := newTestMemberp(1, []string{"http://a"}, "", nil) + m := newTestMember(1, []string{"http://a"}, "", nil) h.Add(m) if _, ok := ls.Followers["1"]; !ok { @@ -70,7 +70,7 @@ func TestSendHubAdd(t *testing.T) { } func TestSendHubRemove(t *testing.T) { - membs := []Member{ + membs := []*Member{ newTestMember(1, []string{"http://a"}, "", nil), } cl := newTestCluster(membs) diff --git a/etcdserver/server.go b/etcdserver/server.go index 6bf0a4e77..62c0fb03e 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -37,11 +37,11 @@ import ( "github.com/coreos/etcd/etcdserver/stats" "github.com/coreos/etcd/pkg/pbutil" "github.com/coreos/etcd/pkg/types" + "github.com/coreos/etcd/pkg/wait" "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/snap" "github.com/coreos/etcd/store" - "github.com/coreos/etcd/wait" "github.com/coreos/etcd/wal" ) @@ -89,6 +89,7 @@ type Sender interface { Send(m []raftpb.Message) Add(m *Member) Remove(id types.ID) + Update(m *Member) Stop() } @@ -114,7 +115,7 @@ type Server interface { // Stop terminates the Server and performs any necessary finalization. // Do and Process cannot be called after Stop has been invoked. Stop() - // Do takes a request and attempts to fulfil it, returning a Response. + // Do takes a request and attempts to fulfill it, returning a Response. Do(ctx context.Context, r pb.Request) (Response, error) // Process takes a raft message and applies it to the server's raft state // machine, respecting any timeout of the given context. @@ -127,6 +128,10 @@ type Server interface { // return ErrIDRemoved if member ID is removed from the cluster, or return // ErrIDNotFound if member ID is not in the cluster. RemoveMember(ctx context.Context, id uint64) error + + // UpdateMember attempts to update a existing member in the cluster. It will + // return ErrIDNotFound if the member ID does not exist. + UpdateMember(ctx context.Context, updateMemb Member) error } type Stats interface { @@ -479,6 +484,20 @@ func (s *EtcdServer) RemoveMember(ctx context.Context, id uint64) error { return s.configure(ctx, cc) } +func (s *EtcdServer) UpdateMember(ctx context.Context, memb Member) error { + b, err := json.Marshal(memb) + if err != nil { + return err + } + cc := raftpb.ConfChange{ + ID: GenID(), + Type: raftpb.ConfChangeUpdateNode, + NodeID: uint64(memb.ID), + Context: b, + } + return s.configure(ctx, cc) +} + // Implement the RaftTimer interface func (s *EtcdServer) Index() uint64 { return atomic.LoadUint64(&s.raftIndex) @@ -669,13 +688,36 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) error { log.Panicf("nodeID should always be equal to member ID") } s.Cluster.AddMember(m) - s.sender.Add(m) - log.Printf("etcdserver: added node %s %v to cluster %s", types.ID(cc.NodeID), m.PeerURLs, s.Cluster.ID()) + if m.ID == s.id { + log.Printf("etcdserver: added local member %s %v to cluster %s", m.ID, m.PeerURLs, s.Cluster.ID()) + } else { + s.sender.Add(m) + log.Printf("etcdserver: added member %s %v to cluster %s", m.ID, m.PeerURLs, s.Cluster.ID()) + } case raftpb.ConfChangeRemoveNode: id := types.ID(cc.NodeID) s.Cluster.RemoveMember(id) - s.sender.Remove(id) - log.Printf("etcdserver: removed node %s from cluster %s", id, s.Cluster.ID()) + if id == s.id { + log.Printf("etcdserver: removed local member %s from cluster %s", id, s.Cluster.ID()) + } else { + s.sender.Remove(id) + log.Printf("etcdserver: removed member %s from cluster %s", id, s.Cluster.ID()) + } + case raftpb.ConfChangeUpdateNode: + m := new(Member) + if err := json.Unmarshal(cc.Context, m); err != nil { + log.Panicf("unmarshal member should never fail: %v", err) + } + if cc.NodeID != uint64(m.ID) { + log.Panicf("nodeID should always be equal to member ID") + } + s.Cluster.UpdateMember(m) + if m.ID == s.id { + log.Printf("etcdserver: update local member %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID()) + } else { + s.sender.Update(m) + log.Printf("etcdserver: update member %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID()) + } } return nil } @@ -747,7 +789,7 @@ func startNode(cfg *ServerConfig, ids []types.ID) (id types.ID, n raft.Node, s * peers[i] = raft.Peer{ID: uint64(id), Context: ctx} } id = member.ID - log.Printf("etcdserver: start node %s in cluster %s", id, cfg.Cluster.ID()) + log.Printf("etcdserver: start member %s in cluster %s", id, cfg.Cluster.ID()) s = raft.NewMemoryStorage() n = raft.StartNode(uint64(id), peers, 10, 1, s) return diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 772ba2079..6eef6e245 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -407,7 +407,7 @@ func TestApplyRequest(t *testing.T) { } func TestApplyRequestOnAdminMemberAttributes(t *testing.T) { - cl := newTestCluster([]Member{{ID: 1}}) + cl := newTestCluster([]*Member{{ID: 1}}) srv := &EtcdServer{ store: &storeRecorder{}, Cluster: cl, @@ -447,7 +447,7 @@ func TestApplyConfChangeError(t *testing.T) { }, { raftpb.ConfChange{ - Type: raftpb.ConfChangeRemoveNode, + Type: raftpb.ConfChangeUpdateNode, NodeID: 4, }, ErrIDRemoved, @@ -503,6 +503,7 @@ func (s *fakeSender) Send(msgs []raftpb.Message) { } } func (s *fakeSender) Add(m *Member) {} +func (s *fakeSender) Update(m *Member) {} func (s *fakeSender) Remove(id types.ID) {} func (s *fakeSender) Stop() {} @@ -559,8 +560,8 @@ func testServer(t *testing.T, ns uint64) { g, w := resp.Event.Node, &store.NodeExtern{ Key: "/foo", - ModifiedIndex: uint64(i) + 2*ns, - CreatedIndex: uint64(i) + 2*ns, + ModifiedIndex: uint64(i) + ns, + CreatedIndex: uint64(i) + ns, Value: stringp("bar"), } @@ -1019,7 +1020,9 @@ func TestRemoveMember(t *testing.T) { Nodes: []uint64{1234, 2345, 3456}, }, } - cl := newTestCluster([]Member{{ID: 1234}}) + cl := newTestCluster(nil) + cl.SetStore(store.New()) + cl.AddMember(&Member{ID: 1234}) s := &EtcdServer{ node: n, raftStorage: raft.NewMemoryStorage(), @@ -1045,6 +1048,44 @@ func TestRemoveMember(t *testing.T) { } } +// TestUpdateMember tests RemoveMember can propose and perform node update. +func TestUpdateMember(t *testing.T) { + n := newNodeConfChangeCommitterRecorder() + n.readyc <- raft.Ready{ + SoftState: &raft.SoftState{ + RaftState: raft.StateLeader, + Nodes: []uint64{1234, 2345, 3456}, + }, + } + cl := newTestCluster(nil) + cl.SetStore(store.New()) + cl.AddMember(&Member{ID: 1234}) + s := &EtcdServer{ + node: n, + raftStorage: raft.NewMemoryStorage(), + store: &storeRecorder{}, + sender: &nopSender{}, + storage: &storageRecorder{}, + Cluster: cl, + } + s.start() + wm := Member{ID: 1234, RaftAttributes: RaftAttributes{PeerURLs: []string{"http://127.0.0.1:1"}}} + err := s.UpdateMember(context.TODO(), wm) + gaction := n.Action() + s.Stop() + + if err != nil { + t.Fatalf("UpdateMember error: %v", err) + } + wactions := []action{action{name: "ProposeConfChange:ConfChangeUpdateNode"}, action{name: "ApplyConfChange:ConfChangeUpdateNode"}} + if !reflect.DeepEqual(gaction, wactions) { + t.Errorf("action = %v, want %v", gaction, wactions) + } + if !reflect.DeepEqual(cl.Member(1234), &wm) { + t.Errorf("member = %v, want %v", cl.Member(1234), &wm) + } +} + // TODO: test server could stop itself when being removed // TODO: test wait trigger correctness in multi-server case @@ -1129,25 +1170,25 @@ func TestGetOtherPeerURLs(t *testing.T) { }{ { []*Member{ - newTestMemberp(1, []string{"http://10.0.0.1"}, "a", nil), + newTestMember(1, []string{"http://10.0.0.1"}, "a", nil), }, "a", []string{}, }, { []*Member{ - newTestMemberp(1, []string{"http://10.0.0.1"}, "a", nil), - newTestMemberp(2, []string{"http://10.0.0.2"}, "b", nil), - newTestMemberp(3, []string{"http://10.0.0.3"}, "c", nil), + newTestMember(1, []string{"http://10.0.0.1"}, "a", nil), + newTestMember(2, []string{"http://10.0.0.2"}, "b", nil), + newTestMember(3, []string{"http://10.0.0.3"}, "c", nil), }, "a", []string{"http://10.0.0.2", "http://10.0.0.3"}, }, { []*Member{ - newTestMemberp(1, []string{"http://10.0.0.1"}, "a", nil), - newTestMemberp(3, []string{"http://10.0.0.3"}, "c", nil), - newTestMemberp(2, []string{"http://10.0.0.2"}, "b", nil), + newTestMember(1, []string{"http://10.0.0.1"}, "a", nil), + newTestMember(3, []string{"http://10.0.0.3"}, "c", nil), + newTestMember(2, []string{"http://10.0.0.2"}, "b", nil), }, "a", []string{"http://10.0.0.2", "http://10.0.0.3"}, @@ -1474,6 +1515,7 @@ type nopSender struct{} func (s *nopSender) Send(m []raftpb.Message) {} func (s *nopSender) Add(m *Member) {} func (s *nopSender) Remove(id types.ID) {} +func (s *nopSender) Update(m *Member) {} func (s *nopSender) Stop() {} func mustMakePeerSlice(t *testing.T, ids ...uint64) []raft.Peer { diff --git a/integration/cluster_test.go b/integration/cluster_test.go index f14f61ecd..e3f34765e 100644 --- a/integration/cluster_test.go +++ b/integration/cluster_test.go @@ -24,8 +24,9 @@ import ( "net/http" "net/http/httptest" "os" + "reflect" + "sort" "strings" - "sync" "testing" "time" @@ -57,15 +58,7 @@ func testCluster(t *testing.T, size int) { c := NewCluster(t, size) c.Launch(t) defer c.Terminate(t) - for i, u := range c.URLs() { - cc := mustNewHTTPClient(t, []string{u}) - kapi := client.NewKeysAPI(cc) - ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) - if _, err := kapi.Create(ctx, fmt.Sprintf("/%d", i), "bar", -1); err != nil { - t.Errorf("create on %s error: %v", u, err) - } - cancel() - } + clusterMustProgress(t, c) } func TestClusterOf1UsingDiscovery(t *testing.T) { testClusterUsingDiscovery(t, 1) } @@ -88,13 +81,43 @@ func testClusterUsingDiscovery(t *testing.T, size int) { c := NewClusterByDiscovery(t, size, dc.URL(0)+"/v2/keys") c.Launch(t) defer c.Terminate(t) + clusterMustProgress(t, c) +} - for i, u := range c.URLs() { +func TestDoubleClusterSizeOf1(t *testing.T) { testDoubleClusterSize(t, 1) } +func TestDoubleClusterSizeOf3(t *testing.T) { testDoubleClusterSize(t, 3) } + +func testDoubleClusterSize(t *testing.T, size int) { + defer afterTest(t) + c := NewCluster(t, size) + c.Launch(t) + defer c.Terminate(t) + + for i := 0; i < size; i++ { + c.AddMember(t) + } + clusterMustProgress(t, c) +} + +// clusterMustProgress ensures that cluster can make progress. It creates +// a key first, and check the new key could be got from all client urls of +// the cluster. +func clusterMustProgress(t *testing.T, cl *cluster) { + cc := mustNewHTTPClient(t, []string{cl.URL(0)}) + kapi := client.NewKeysAPI(cc) + ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) + resp, err := kapi.Create(ctx, "/foo", "bar", -1) + if err != nil { + t.Fatalf("create on %s error: %v", cl.URL(0), err) + } + cancel() + + for i, u := range cl.URLs() { cc := mustNewHTTPClient(t, []string{u}) kapi := client.NewKeysAPI(cc) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) - if _, err := kapi.Create(ctx, fmt.Sprintf("/%d", i), "bar", -1); err != nil { - t.Errorf("create on %s error: %v", u, err) + if _, err := kapi.Watch("foo", resp.Node.ModifiedIndex).Next(ctx); err != nil { + t.Fatalf("#%d: watch on %s error: %v", i, u, err) } cancel() } @@ -111,7 +134,7 @@ func NewCluster(t *testing.T, size int) *cluster { c := &cluster{} ms := make([]*member, size) for i := 0; i < size; i++ { - ms[i] = newMember(t, c.name(i)) + ms[i] = mustNewMember(t, c.name(i)) } c.Members = ms @@ -139,7 +162,7 @@ func NewClusterByDiscovery(t *testing.T, size int, url string) *cluster { c := &cluster{} ms := make([]*member, size) for i := 0; i < size; i++ { - ms[i] = newMember(t, c.name(i)) + ms[i] = mustNewMember(t, c.name(i)) ms[i].DiscoveryURL = url } c.Members = ms @@ -147,19 +170,21 @@ func NewClusterByDiscovery(t *testing.T, size int, url string) *cluster { } func (c *cluster) Launch(t *testing.T) { - var wg sync.WaitGroup + errc := make(chan error) for _, m := range c.Members { - wg.Add(1) // Members are launched in separate goroutines because if they boot // using discovery url, they have to wait for others to register to continue. go func(m *member) { - m.Launch(t) - wg.Done() + errc <- m.Launch() }(m) } - wg.Wait() + for _ = range c.Members { + if err := <-errc; err != nil { + t.Fatalf("error setting up member: %v", err) + } + } // wait cluster to be stable to receive future client requests - c.waitClientURLsPublished(t) + c.waitMembersMatch(t, c.HTTPMembers()) } func (c *cluster) URL(i int) string { @@ -176,47 +201,94 @@ func (c *cluster) URLs() []string { return urls } +func (c *cluster) HTTPMembers() []httptypes.Member { + ms := make([]httptypes.Member, len(c.Members)) + for i, m := range c.Members { + ms[i].Name = m.Name + for _, ln := range m.PeerListeners { + ms[i].PeerURLs = append(ms[i].PeerURLs, "http://"+ln.Addr().String()) + } + for _, ln := range m.ClientListeners { + ms[i].ClientURLs = append(ms[i].ClientURLs, "http://"+ln.Addr().String()) + } + } + return ms +} + +func (c *cluster) AddMember(t *testing.T) { + clusterStr := c.Members[0].Cluster.String() + idx := len(c.Members) + m := mustNewMember(t, c.name(idx)) + + // send add request to the cluster + cc := mustNewHTTPClient(t, []string{c.URL(0)}) + ma := client.NewMembersAPI(cc) + ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) + peerURL := "http://" + m.PeerListeners[0].Addr().String() + if _, err := ma.Add(ctx, peerURL); err != nil { + t.Fatalf("add member on %s error: %v", c.URL(0), err) + } + cancel() + + // wait for the add node entry applied in the cluster + members := append(c.HTTPMembers(), httptypes.Member{PeerURLs: []string{peerURL}, ClientURLs: []string{}}) + c.waitMembersMatch(t, members) + + for _, ln := range m.PeerListeners { + clusterStr += fmt.Sprintf(",%s=http://%s", m.Name, ln.Addr().String()) + } + var err error + m.Cluster, err = etcdserver.NewClusterFromString(clusterName, clusterStr) + if err != nil { + t.Fatal(err) + } + m.NewCluster = false + if err := m.Launch(); err != nil { + t.Fatal(err) + } + c.Members = append(c.Members, m) + // wait cluster to be stable to receive future client requests + c.waitMembersMatch(t, c.HTTPMembers()) +} + func (c *cluster) Terminate(t *testing.T) { for _, m := range c.Members { m.Terminate(t) } } -func (c *cluster) waitClientURLsPublished(t *testing.T) { - timer := time.AfterFunc(10*time.Second, func() { - t.Fatal("wait too long for client urls publish") - }) - cc := mustNewHTTPClient(t, []string{c.URL(0)}) - ma := client.NewMembersAPI(cc) - for { - ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) - membs, err := ma.List(ctx) - cancel() - if err == nil && c.checkClientURLsPublished(membs) { - break +func (c *cluster) waitMembersMatch(t *testing.T, membs []httptypes.Member) { + for _, u := range c.URLs() { + cc := mustNewHTTPClient(t, []string{u}) + ma := client.NewMembersAPI(cc) + for { + ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) + ms, err := ma.List(ctx) + cancel() + if err == nil && isMembersEqual(ms, membs) { + break + } + time.Sleep(tickDuration) } - time.Sleep(tickDuration) } - timer.Stop() return } -func (c *cluster) checkClientURLsPublished(membs []httptypes.Member) bool { - if len(membs) != len(c.Members) { - return false - } - for _, m := range membs { - if len(m.ClientURLs) == 0 { - return false - } - } - return true -} - func (c *cluster) name(i int) string { return fmt.Sprint("node", i) } +// isMembersEqual checks whether two members equal except ID field. +// The given wmembs should always set ID field to empty string. +func isMembersEqual(membs []httptypes.Member, wmembs []httptypes.Member) bool { + sort.Sort(SortableMemberSliceByPeerURLs(membs)) + sort.Sort(SortableMemberSliceByPeerURLs(wmembs)) + for i := range membs { + membs[i].ID = "" + } + return reflect.DeepEqual(membs, wmembs) +} + func newLocalListener(t *testing.T) net.Listener { l, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { @@ -233,18 +305,26 @@ type member struct { hss []*httptest.Server } -func newMember(t *testing.T, name string) *member { +func mustNewMember(t *testing.T, name string) *member { var err error m := &member{} + pln := newLocalListener(t) m.PeerListeners = []net.Listener{pln} + m.PeerURLs, err = types.NewURLs([]string{"http://" + pln.Addr().String()}) + if err != nil { + t.Fatal(err) + } + cln := newLocalListener(t) m.ClientListeners = []net.Listener{cln} - m.Name = name m.ClientURLs, err = types.NewURLs([]string{"http://" + cln.Addr().String()}) if err != nil { t.Fatal(err) } + + m.Name = name + m.DataDir, err = ioutil.TempDir(os.TempDir(), "etcd") if err != nil { t.Fatal(err) @@ -261,13 +341,13 @@ func newMember(t *testing.T, name string) *member { // Launch starts a member based on ServerConfig, PeerListeners // and ClientListeners. -func (m *member) Launch(t *testing.T) { +func (m *member) Launch() error { var err error if m.s, err = etcdserver.NewServer(&m.ServerConfig); err != nil { - t.Fatalf("failed to initialize the etcd server: %v", err) + return fmt.Errorf("failed to initialize the etcd server: %v", err) } m.s.Ticker = time.Tick(tickDuration) - m.s.SyncTicker = time.Tick(10 * tickDuration) + m.s.SyncTicker = time.Tick(500 * time.Millisecond) m.s.Start() for _, ln := range m.PeerListeners { @@ -286,6 +366,7 @@ func (m *member) Launch(t *testing.T) { hs.Start() m.hss = append(m.hss, hs) } + return nil } // Stop stops the member, but the data dir of the member is preserved. @@ -325,3 +406,11 @@ func newTransport() *http.Transport { tr.Dial = (&net.Dialer{Timeout: 100 * time.Millisecond}).Dial return tr } + +type SortableMemberSliceByPeerURLs []httptypes.Member + +func (p SortableMemberSliceByPeerURLs) Len() int { return len(p) } +func (p SortableMemberSliceByPeerURLs) Less(i, j int) bool { + return p[i].PeerURLs[0] < p[j].PeerURLs[0] +} +func (p SortableMemberSliceByPeerURLs) Swap(i, j int) { p[i], p[j] = p[j], p[i] } diff --git a/integration/v2_http_kv_test.go b/integration/v2_http_kv_test.go index 6047187b7..55addb7cf 100644 --- a/integration/v2_http_kv_test.go +++ b/integration/v2_http_kv_test.go @@ -55,19 +55,19 @@ func TestV2Set(t *testing.T) { "/v2/keys/foo/bar", v, http.StatusCreated, - `{"action":"set","node":{"key":"/foo/bar","value":"bar","modifiedIndex":4,"createdIndex":4}}`, + `{"action":"set","node":{"key":"/foo/bar","value":"bar","modifiedIndex":3,"createdIndex":3}}`, }, { "/v2/keys/foodir?dir=true", url.Values{}, http.StatusCreated, - `{"action":"set","node":{"key":"/foodir","dir":true,"modifiedIndex":5,"createdIndex":5}}`, + `{"action":"set","node":{"key":"/foodir","dir":true,"modifiedIndex":4,"createdIndex":4}}`, }, { "/v2/keys/fooempty", url.Values(map[string][]string{"value": {""}}), http.StatusCreated, - `{"action":"set","node":{"key":"/fooempty","value":"","modifiedIndex":6,"createdIndex":6}}`, + `{"action":"set","node":{"key":"/fooempty","value":"","modifiedIndex":5,"createdIndex":5}}`, }, } @@ -216,12 +216,12 @@ func TestV2CAS(t *testing.T) { }, { "/v2/keys/cas/foo", - url.Values(map[string][]string{"value": {"YYY"}, "prevIndex": {"4"}}), + url.Values(map[string][]string{"value": {"YYY"}, "prevIndex": {"3"}}), http.StatusOK, map[string]interface{}{ "node": map[string]interface{}{ "value": "YYY", - "modifiedIndex": float64(5), + "modifiedIndex": float64(4), }, "action": "compareAndSwap", }, @@ -233,8 +233,8 @@ func TestV2CAS(t *testing.T) { map[string]interface{}{ "errorCode": float64(101), "message": "Compare failed", - "cause": "[10 != 5]", - "index": float64(5), + "cause": "[10 != 4]", + "index": float64(4), }, }, { @@ -283,7 +283,7 @@ func TestV2CAS(t *testing.T) { map[string]interface{}{ "errorCode": float64(101), "message": "Compare failed", - "cause": "[bad_value != ZZZ] [100 != 6]", + "cause": "[bad_value != ZZZ] [100 != 5]", }, }, { @@ -293,12 +293,12 @@ func TestV2CAS(t *testing.T) { map[string]interface{}{ "errorCode": float64(101), "message": "Compare failed", - "cause": "[100 != 6]", + "cause": "[100 != 5]", }, }, { "/v2/keys/cas/foo", - url.Values(map[string][]string{"value": {"XXX"}, "prevValue": {"bad_value"}, "prevIndex": {"6"}}), + url.Values(map[string][]string{"value": {"XXX"}, "prevValue": {"bad_value"}, "prevIndex": {"5"}}), http.StatusPreconditionFailed, map[string]interface{}{ "errorCode": float64(101), @@ -448,7 +448,7 @@ func TestV2CAD(t *testing.T) { map[string]interface{}{ "errorCode": float64(101), "message": "Compare failed", - "cause": "[100 != 4]", + "cause": "[100 != 3]", }, }, { @@ -460,12 +460,12 @@ func TestV2CAD(t *testing.T) { }, }, { - "/v2/keys/foo?prevIndex=4", + "/v2/keys/foo?prevIndex=3", http.StatusOK, map[string]interface{}{ "node": map[string]interface{}{ "key": "/foo", - "modifiedIndex": float64(6), + "modifiedIndex": float64(5), }, "action": "compareAndDelete", }, @@ -493,7 +493,7 @@ func TestV2CAD(t *testing.T) { map[string]interface{}{ "node": map[string]interface{}{ "key": "/foovalue", - "modifiedIndex": float64(7), + "modifiedIndex": float64(6), }, "action": "compareAndDelete", }, @@ -531,7 +531,7 @@ func TestV2Unique(t *testing.T) { http.StatusCreated, map[string]interface{}{ "node": map[string]interface{}{ - "key": "/foo/4", + "key": "/foo/3", "value": "XXX", }, "action": "create", @@ -543,7 +543,7 @@ func TestV2Unique(t *testing.T) { http.StatusCreated, map[string]interface{}{ "node": map[string]interface{}{ - "key": "/foo/5", + "key": "/foo/4", "value": "XXX", }, "action": "create", @@ -555,7 +555,7 @@ func TestV2Unique(t *testing.T) { http.StatusCreated, map[string]interface{}{ "node": map[string]interface{}{ - "key": "/bar/6", + "key": "/bar/5", "value": "XXX", }, "action": "create", @@ -617,8 +617,8 @@ func TestV2Get(t *testing.T) { map[string]interface{}{ "key": "/foo/bar", "dir": true, - "createdIndex": float64(4), - "modifiedIndex": float64(4), + "createdIndex": float64(3), + "modifiedIndex": float64(3), }, }, }, @@ -636,14 +636,14 @@ func TestV2Get(t *testing.T) { map[string]interface{}{ "key": "/foo/bar", "dir": true, - "createdIndex": float64(4), - "modifiedIndex": float64(4), + "createdIndex": float64(3), + "modifiedIndex": float64(3), "nodes": []interface{}{ map[string]interface{}{ "key": "/foo/bar/zar", "value": "XXX", - "createdIndex": float64(4), - "modifiedIndex": float64(4), + "createdIndex": float64(3), + "modifiedIndex": float64(3), }, }, }, @@ -711,8 +711,8 @@ func TestV2QuorumGet(t *testing.T) { map[string]interface{}{ "key": "/foo/bar", "dir": true, - "createdIndex": float64(4), - "modifiedIndex": float64(4), + "createdIndex": float64(3), + "modifiedIndex": float64(3), }, }, }, @@ -730,14 +730,14 @@ func TestV2QuorumGet(t *testing.T) { map[string]interface{}{ "key": "/foo/bar", "dir": true, - "createdIndex": float64(4), - "modifiedIndex": float64(4), + "createdIndex": float64(3), + "modifiedIndex": float64(3), "nodes": []interface{}{ map[string]interface{}{ "key": "/foo/bar/zar", "value": "XXX", - "createdIndex": float64(4), - "modifiedIndex": float64(4), + "createdIndex": float64(3), + "modifiedIndex": float64(3), }, }, }, @@ -797,7 +797,7 @@ func TestV2Watch(t *testing.T) { "node": map[string]interface{}{ "key": "/foo/bar", "value": "XXX", - "modifiedIndex": float64(4), + "modifiedIndex": float64(3), }, "action": "set", } @@ -818,7 +818,7 @@ func TestV2WatchWithIndex(t *testing.T) { var body map[string]interface{} c := make(chan bool, 1) go func() { - resp, _ := tc.Get(fmt.Sprintf("%s%s", u, "/v2/keys/foo/bar?wait=true&waitIndex=5")) + resp, _ := tc.Get(fmt.Sprintf("%s%s", u, "/v2/keys/foo/bar?wait=true&waitIndex=4")) body = tc.ReadBodyJSON(resp) c <- true }() @@ -855,7 +855,7 @@ func TestV2WatchWithIndex(t *testing.T) { "node": map[string]interface{}{ "key": "/foo/bar", "value": "XXX", - "modifiedIndex": float64(5), + "modifiedIndex": float64(4), }, "action": "set", } diff --git a/pkg/transport/listener.go b/pkg/transport/listener.go index b4172dde7..a8d3eac5c 100644 --- a/pkg/transport/listener.go +++ b/pkg/transport/listener.go @@ -27,13 +27,13 @@ import ( "time" ) -func NewListener(addr string, info TLSInfo) (net.Listener, error) { +func NewListener(addr string, scheme string, info TLSInfo) (net.Listener, error) { l, err := net.Listen("tcp", addr) if err != nil { return nil, err } - if !info.Empty() { + if !info.Empty() && scheme == "https" { cfg, err := info.ServerConfig() if err != nil { return nil, err diff --git a/wait/wait.go b/pkg/wait/wait.go similarity index 100% rename from wait/wait.go rename to pkg/wait/wait.go diff --git a/wait/wait_test.go b/pkg/wait/wait_test.go similarity index 100% rename from wait/wait_test.go rename to pkg/wait/wait_test.go diff --git a/proxy/reverse.go b/proxy/reverse.go index 2dfb313f0..c9addd2ed 100644 --- a/proxy/reverse.go +++ b/proxy/reverse.go @@ -17,12 +17,15 @@ package proxy import ( + "fmt" "io" "log" "net" "net/http" "net/url" "strings" + + "github.com/coreos/etcd/etcdserver/etcdhttp/httptypes" ) // Hop-by-hop headers. These are removed when sent to the backend. @@ -64,8 +67,11 @@ func (p *reverseProxy) ServeHTTP(rw http.ResponseWriter, clientreq *http.Request endpoints := p.director.endpoints() if len(endpoints) == 0 { - log.Printf("proxy: zero endpoints currently available") - rw.WriteHeader(http.StatusServiceUnavailable) + msg := "proxy: zero endpoints currently available" + // TODO: limit the rate of the error logging. + log.Printf(msg) + e := httptypes.NewHTTPError(http.StatusServiceUnavailable, msg) + e.WriteTo(rw) return } @@ -86,8 +92,11 @@ func (p *reverseProxy) ServeHTTP(rw http.ResponseWriter, clientreq *http.Request } if res == nil { - log.Printf("proxy: unable to get response from %d endpoint(s)", len(endpoints)) - rw.WriteHeader(http.StatusBadGateway) + // TODO: limit the rate of the error logging. + msg := fmt.Sprintf("proxy: unable to get response from %d endpoint(s)", len(endpoints)) + log.Printf(msg) + e := httptypes.NewHTTPError(http.StatusBadGateway, msg) + e.WriteTo(rw) return } diff --git a/proxy/reverse_test.go b/proxy/reverse_test.go index 0a74dd8a7..499b8d879 100644 --- a/proxy/reverse_test.go +++ b/proxy/reverse_test.go @@ -70,6 +70,7 @@ func TestReverseProxyServe(t *testing.T) { res: &http.Response{ StatusCode: http.StatusCreated, Body: ioutil.NopCloser(&bytes.Reader{}), + Header: map[string][]string{"Content-Type": []string{"application/json"}}, }, }, want: http.StatusCreated, @@ -89,6 +90,9 @@ func TestReverseProxyServe(t *testing.T) { if rr.Code != tt.want { t.Errorf("#%d: unexpected HTTP status code: want = %d, got = %d", i, tt.want, rr.Code) } + if gct := rr.Header().Get("Content-Type"); gct != "application/json" { + t.Errorf("#%d: Content-Type = %s, want %s", i, gct, "application/json") + } } } diff --git a/raft/log.go b/raft/log.go index 1203bcb28..63edccbb1 100644 --- a/raft/log.go +++ b/raft/log.go @@ -176,10 +176,9 @@ func (l *raftLog) appliedTo(i uint64) { } func (l *raftLog) stableTo(i uint64) { - if i == 0 { - return + if len(l.unstableEnts) > 0 { + l.unstableEnts = l.unstableEnts[i+1-l.unstable:] } - l.unstableEnts = l.unstableEnts[i+1-l.unstable:] l.unstable = i + 1 } diff --git a/raft/log_test.go b/raft/log_test.go index 4965c8054..21ff06fad 100644 --- a/raft/log_test.go +++ b/raft/log_test.go @@ -345,6 +345,7 @@ func TestUnstableEnts(t *testing.T) { }{ {3, nil, 3}, {1, previousEnts[1:], 3}, + {0, append([]pb.Entry{{}}, previousEnts...), 3}, } for i, tt := range tests { @@ -353,7 +354,9 @@ func TestUnstableEnts(t *testing.T) { raftLog := newLog(storage) raftLog.append(raftLog.lastIndex(), previousEnts[tt.unstable:]...) ents := raftLog.unstableEntries() - raftLog.stableTo(raftLog.lastIndex()) + if l := len(ents); l > 0 { + raftLog.stableTo(ents[l-1].Index) + } if !reflect.DeepEqual(ents, tt.wents) { t.Errorf("#%d: unstableEnts = %+v, want %+v", i, ents, tt.wents) } @@ -363,6 +366,25 @@ func TestUnstableEnts(t *testing.T) { } } +func TestStableTo(t *testing.T) { + tests := []struct { + stable uint64 + wunstable uint64 + }{ + {0, 1}, + {1, 2}, + {2, 3}, + } + for i, tt := range tests { + raftLog := newLog(NewMemoryStorage()) + raftLog.append(0, []pb.Entry{{}, {}}...) + raftLog.stableTo(tt.stable) + if raftLog.unstable != tt.wunstable { + t.Errorf("#%d: unstable = %d, want %d", i, raftLog.unstable, tt.wunstable) + } + } +} + //TestCompaction ensures that the number of log entries is correct after compactions. func TestCompaction(t *testing.T) { tests := []struct { diff --git a/raft/node.go b/raft/node.go index 411085ae1..010b935ff 100644 --- a/raft/node.go +++ b/raft/node.go @@ -143,7 +143,7 @@ type Peer struct { // StartNode returns a new Node given a unique raft id, a list of raft peers, and // the election and heartbeat timeouts in units of ticks. -// It also builds ConfChangeAddNode entry for each peer and puts them at the head of the log. +// It appends a ConfChangeAddNode entry for each given peer to the initial log. func StartNode(id uint64, peers []Peer, election, heartbeat int, storage Storage) Node { n := newNode() r := newRaft(id, nil, election, heartbeat, storage) @@ -191,6 +191,7 @@ type node struct { advancec chan struct{} tickc chan struct{} done chan struct{} + stop chan struct{} } func newNode() node { @@ -203,11 +204,20 @@ func newNode() node { advancec: make(chan struct{}), tickc: make(chan struct{}), done: make(chan struct{}), + stop: make(chan struct{}), } } func (n *node) Stop() { - close(n.done) + select { + case n.stop <- struct{}{}: + // Not already stopped, so trigger it + case <-n.done: + // Node has already been stopped - no need to do anything + return + } + // Block until the stop has been acknowledged by run() + <-n.done } func (n *node) run(r *raft) { @@ -270,6 +280,8 @@ func (n *node) run(r *raft) { r.addNode(cc.NodeID) case pb.ConfChangeRemoveNode: r.removeNode(cc.NodeID) + case pb.ConfChangeUpdateNode: + r.resetPendingConf() default: panic("unexpected conf type") } @@ -297,11 +309,10 @@ func (n *node) run(r *raft) { if prevHardSt.Commit != 0 { r.raftLog.appliedTo(prevHardSt.Commit) } - if prevLastUnstablei != 0 { - r.raftLog.stableTo(prevLastUnstablei) - } + r.raftLog.stableTo(prevLastUnstablei) advancec = nil - case <-n.done: + case <-n.stop: + close(n.done) return } } diff --git a/raft/node_test.go b/raft/node_test.go index 9639581b5..cec08c4cc 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -107,6 +107,86 @@ func TestNodeStepUnblock(t *testing.T) { } } +// TestNodePropose ensures that node.Propose sends the given proposal to the underlying raft. +func TestNodePropose(t *testing.T) { + msgs := []raftpb.Message{} + appendStep := func(r *raft, m raftpb.Message) { + msgs = append(msgs, m) + } + + n := newNode() + s := NewMemoryStorage() + r := newRaft(1, []uint64{1}, 10, 1, s) + go n.run(r) + n.Campaign(context.TODO()) + for { + rd := <-n.Ready() + s.Append(rd.Entries) + // change the step function to appendStep until this raft becomes leader + if rd.SoftState.Lead == r.id { + r.step = appendStep + n.Advance() + break + } + n.Advance() + } + n.Propose(context.TODO(), []byte("somedata")) + n.Stop() + + if len(msgs) != 1 { + t.Fatalf("len(msgs) = %d, want %d", len(msgs), 1) + } + if msgs[0].Type != raftpb.MsgProp { + t.Errorf("msg type = %d, want %d", msgs[0].Type, raftpb.MsgProp) + } + if !reflect.DeepEqual(msgs[0].Entries[0].Data, []byte("somedata")) { + t.Errorf("data = %v, want %v", msgs[0].Entries[0].Data, []byte("somedata")) + } +} + +// TestNodeProposeConfig ensures that node.ProposeConfChange sends the given configuration proposal +// to the underlying raft. +func TestNodeProposeConfig(t *testing.T) { + msgs := []raftpb.Message{} + appendStep := func(r *raft, m raftpb.Message) { + msgs = append(msgs, m) + } + + n := newNode() + s := NewMemoryStorage() + r := newRaft(1, []uint64{1}, 10, 1, s) + go n.run(r) + n.Campaign(context.TODO()) + for { + rd := <-n.Ready() + s.Append(rd.Entries) + // change the step function to appendStep until this raft becomes leader + if rd.SoftState.Lead == r.id { + r.step = appendStep + n.Advance() + break + } + n.Advance() + } + cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1} + ccdata, err := cc.Marshal() + if err != nil { + t.Fatal(err) + } + n.ProposeConfChange(context.TODO(), cc) + n.Stop() + + if len(msgs) != 1 { + t.Fatalf("len(msgs) = %d, want %d", len(msgs), 1) + } + if msgs[0].Type != raftpb.MsgProp { + t.Errorf("msg type = %d, want %d", msgs[0].Type, raftpb.MsgProp) + } + if !reflect.DeepEqual(msgs[0].Entries[0].Data, ccdata) { + t.Errorf("data = %v, want %v", msgs[0].Entries[0].Data, ccdata) + } +} + // TestBlockProposal ensures that node will block proposal when it does not // know who is the current leader; node will accept proposal when it knows // who is the current leader. @@ -140,6 +220,56 @@ func TestBlockProposal(t *testing.T) { } } +// TestNodeTick ensures that node.Tick() will increase the +// elapsed of the underlying raft state machine. +func TestNodeTick(t *testing.T) { + n := newNode() + s := NewMemoryStorage() + r := newRaft(1, []uint64{1}, 10, 1, s) + go n.run(r) + elapsed := r.elapsed + n.Tick() + n.Stop() + if r.elapsed != elapsed+1 { + t.Errorf("elapsed = %d, want %d", r.elapsed, elapsed+1) + } +} + +// TestNodeStop ensures that node.Stop() blocks until the node has stopped +// processing, and that it is idempotent +func TestNodeStop(t *testing.T) { + n := newNode() + s := NewMemoryStorage() + r := newRaft(1, []uint64{1}, 10, 1, s) + donec := make(chan struct{}) + + go func() { + n.run(r) + close(donec) + }() + + elapsed := r.elapsed + n.Tick() + n.Stop() + + select { + case <-donec: + case <-time.After(time.Second): + t.Fatalf("timed out waiting for node to stop!") + } + + if r.elapsed != elapsed+1 { + t.Errorf("elapsed = %d, want %d", r.elapsed, elapsed+1) + } + // Further ticks should have no effect, the node is stopped. + n.Tick() + if r.elapsed != elapsed+1 { + t.Errorf("elapsed = %d, want %d", r.elapsed, elapsed+1) + } + // Subsequent Stops should have no effect. + n.Stop() +} + func TestReadyContainUpdates(t *testing.T) { tests := []struct { rd Ready @@ -161,7 +291,10 @@ func TestReadyContainUpdates(t *testing.T) { } } -func TestNode(t *testing.T) { +// TestNodeStart ensures that a node can be started correctly. The node should +// start with correct configuration change entries, and can accept and commit +// proposals. +func TestNodeStart(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -236,9 +369,8 @@ func TestNodeRestart(t *testing.T) { n := RestartNode(1, 10, 1, nil, st, storage) if g := <-n.Ready(); !reflect.DeepEqual(g, want) { t.Errorf("g = %+v,\n w %+v", g, want) - } else { - n.Advance() } + n.Advance() select { case rd := <-n.Ready(): diff --git a/raft/raft.go b/raft/raft.go index b98b0af00..9ec1c6145 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -531,6 +531,7 @@ func (r *raft) nodes() []uint64 { for k := range r.prs { nodes = append(nodes, k) } + sort.Sort(uint64Slice(nodes)) return nodes } diff --git a/raft/raft_test.go b/raft/raft_test.go index 5f899e154..21c8dfbef 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -1040,7 +1040,6 @@ func TestRestore(t *testing.T) { t.Errorf("log.lastTerm = %d, want %d", sm.raftLog.term(s.Index), s.Term) } sg := sm.nodes() - sort.Sort(uint64Slice(sg)) if !reflect.DeepEqual(sg, s.Nodes) { t.Errorf("sm.Nodes = %+v, want %+v", sg, s.Nodes) } @@ -1209,7 +1208,6 @@ func TestAddNode(t *testing.T) { t.Errorf("pendingConf = %v, want false", r.pendingConf) } nodes := r.nodes() - sort.Sort(uint64Slice(nodes)) wnodes := []uint64{1, 2} if !reflect.DeepEqual(nodes, wnodes) { t.Errorf("nodes = %v, want %v", nodes, wnodes) @@ -1253,6 +1251,28 @@ func TestPromotable(t *testing.T) { } } +func TestRaftNodes(t *testing.T) { + tests := []struct { + ids []uint64 + wids []uint64 + }{ + { + []uint64{1, 2, 3}, + []uint64{1, 2, 3}, + }, + { + []uint64{3, 2, 1}, + []uint64{1, 2, 3}, + }, + } + for i, tt := range tests { + r := newRaft(1, tt.ids, 10, 1, NewMemoryStorage()) + if !reflect.DeepEqual(r.nodes(), tt.wids) { + t.Errorf("#%d: nodes = %+v, want %+v", i, r.nodes(), tt.wids) + } + } +} + func ents(terms ...uint64) *raft { ents := []pb.Entry{{}} for _, term := range terms { diff --git a/raft/raftpb/raft.pb.go b/raft/raftpb/raft.pb.go index 898df719c..03bdb8c83 100644 --- a/raft/raftpb/raft.pb.go +++ b/raft/raftpb/raft.pb.go @@ -75,7 +75,6 @@ const ( MsgVote MessageType = 5 MsgVoteResp MessageType = 6 MsgSnap MessageType = 7 - MsgDenied MessageType = 8 ) var MessageType_name = map[int32]string{ @@ -87,7 +86,6 @@ var MessageType_name = map[int32]string{ 5: "MsgVote", 6: "MsgVoteResp", 7: "MsgSnap", - 8: "MsgDenied", } var MessageType_value = map[string]int32{ "MsgHup": 0, @@ -98,7 +96,6 @@ var MessageType_value = map[string]int32{ "MsgVote": 5, "MsgVoteResp": 6, "MsgSnap": 7, - "MsgDenied": 8, } func (x MessageType) Enum() *MessageType { @@ -123,15 +120,18 @@ type ConfChangeType int32 const ( ConfChangeAddNode ConfChangeType = 0 ConfChangeRemoveNode ConfChangeType = 1 + ConfChangeUpdateNode ConfChangeType = 2 ) var ConfChangeType_name = map[int32]string{ 0: "ConfChangeAddNode", 1: "ConfChangeRemoveNode", + 2: "ConfChangeUpdateNode", } var ConfChangeType_value = map[string]int32{ "ConfChangeAddNode": 0, "ConfChangeRemoveNode": 1, + "ConfChangeUpdateNode": 2, } func (x ConfChangeType) Enum() *ConfChangeType { diff --git a/raft/raftpb/raft.proto b/raft/raftpb/raft.proto index 47c9ec2ef..7b60393b8 100644 --- a/raft/raftpb/raft.proto +++ b/raft/raftpb/raft.proto @@ -60,6 +60,7 @@ message HardState { enum ConfChangeType { ConfChangeAddNode = 0; ConfChangeRemoveNode = 1; + ConfChangeUpdateNode = 2; } message ConfChange { diff --git a/test b/test index 893284f58..defc1be78 100755 --- a/test +++ b/test @@ -15,7 +15,7 @@ COVER=${COVER:-"-cover"} source ./build # Hack: gofmt ./ will recursively check the .git directory. So use *.go for gofmt. -TESTABLE_AND_FORMATTABLE="client discovery error etcdctl/command etcdmain etcdserver etcdserver/etcdhttp etcdserver/etcdhttp/httptypes etcdserver/etcdserverpb integration pkg/flags pkg/types pkg/transport proxy raft snap store wait wal" +TESTABLE_AND_FORMATTABLE="client discovery error etcdctl/command etcdmain etcdserver etcdserver/etcdhttp etcdserver/etcdhttp/httptypes etcdserver/etcdserverpb integration pkg/flags pkg/types pkg/transport pkg/wait proxy raft snap store wal" FORMATTABLE="$TESTABLE_AND_FORMATTABLE *.go etcdctl/" # user has not provided PKG override