From 6fa8f7763841460c1dd2d2b22b5dddd96ac00250 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 10 Nov 2014 15:56:42 -0800 Subject: [PATCH 01/27] proxy: return JSON errors --- proxy/reverse.go | 15 +++++++++++---- proxy/reverse_test.go | 4 ++++ 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/proxy/reverse.go b/proxy/reverse.go index 2dfb313f0..e1f9328a1 100644 --- a/proxy/reverse.go +++ b/proxy/reverse.go @@ -17,12 +17,15 @@ package proxy import ( + "fmt" "io" "log" "net" "net/http" "net/url" "strings" + + "github.com/coreos/etcd/etcdserver/etcdhttp/httptypes" ) // Hop-by-hop headers. These are removed when sent to the backend. @@ -64,8 +67,10 @@ func (p *reverseProxy) ServeHTTP(rw http.ResponseWriter, clientreq *http.Request endpoints := p.director.endpoints() if len(endpoints) == 0 { - log.Printf("proxy: zero endpoints currently available") - rw.WriteHeader(http.StatusServiceUnavailable) + msg := "proxy: zero endpoints currently available" + log.Printf(msg) + e := httptypes.NewHTTPError(http.StatusServiceUnavailable, msg) + e.WriteTo(rw) return } @@ -86,8 +91,10 @@ func (p *reverseProxy) ServeHTTP(rw http.ResponseWriter, clientreq *http.Request } if res == nil { - log.Printf("proxy: unable to get response from %d endpoint(s)", len(endpoints)) - rw.WriteHeader(http.StatusBadGateway) + msg := fmt.Sprintf("proxy: unable to get response from %d endpoint(s)", len(endpoints)) + log.Printf(msg) + e := httptypes.NewHTTPError(http.StatusBadGateway, msg) + e.WriteTo(rw) return } diff --git a/proxy/reverse_test.go b/proxy/reverse_test.go index 0a74dd8a7..499b8d879 100644 --- a/proxy/reverse_test.go +++ b/proxy/reverse_test.go @@ -70,6 +70,7 @@ func TestReverseProxyServe(t *testing.T) { res: &http.Response{ StatusCode: http.StatusCreated, Body: ioutil.NopCloser(&bytes.Reader{}), + Header: map[string][]string{"Content-Type": []string{"application/json"}}, }, }, want: http.StatusCreated, @@ -89,6 +90,9 @@ func TestReverseProxyServe(t *testing.T) { if rr.Code != tt.want { t.Errorf("#%d: unexpected HTTP status code: want = %d, got = %d", i, tt.want, rr.Code) } + if gct := rr.Header().Get("Content-Type"); gct != "application/json" { + t.Errorf("#%d: Content-Type = %s, want %s", i, gct, "application/json") + } } } From 424377f859f6a28083bfe5b0c8eb3baec2b377b9 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 10 Nov 2014 16:37:15 -0800 Subject: [PATCH 02/27] proxy: add a todo for logging --- proxy/reverse.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/proxy/reverse.go b/proxy/reverse.go index e1f9328a1..c9addd2ed 100644 --- a/proxy/reverse.go +++ b/proxy/reverse.go @@ -68,6 +68,7 @@ func (p *reverseProxy) ServeHTTP(rw http.ResponseWriter, clientreq *http.Request endpoints := p.director.endpoints() if len(endpoints) == 0 { msg := "proxy: zero endpoints currently available" + // TODO: limit the rate of the error logging. log.Printf(msg) e := httptypes.NewHTTPError(http.StatusServiceUnavailable, msg) e.WriteTo(rw) @@ -91,6 +92,7 @@ func (p *reverseProxy) ServeHTTP(rw http.ResponseWriter, clientreq *http.Request } if res == nil { + // TODO: limit the rate of the error logging. msg := fmt.Sprintf("proxy: unable to get response from %d endpoint(s)", len(endpoints)) log.Printf(msg) e := httptypes.NewHTTPError(http.StatusBadGateway, msg) From b1c3c4a202ef1db3642d8381ce4ee7e18a3c1275 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Mon, 10 Nov 2014 13:53:42 -0800 Subject: [PATCH 03/27] integration: rewrite the way to check cluster make progress --- integration/cluster_test.go | 31 +++++++++++++++++++------------ 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/integration/cluster_test.go b/integration/cluster_test.go index f14f61ecd..e70d5e142 100644 --- a/integration/cluster_test.go +++ b/integration/cluster_test.go @@ -57,15 +57,7 @@ func testCluster(t *testing.T, size int) { c := NewCluster(t, size) c.Launch(t) defer c.Terminate(t) - for i, u := range c.URLs() { - cc := mustNewHTTPClient(t, []string{u}) - kapi := client.NewKeysAPI(cc) - ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) - if _, err := kapi.Create(ctx, fmt.Sprintf("/%d", i), "bar", -1); err != nil { - t.Errorf("create on %s error: %v", u, err) - } - cancel() - } + clusterMustProgress(t, c) } func TestClusterOf1UsingDiscovery(t *testing.T) { testClusterUsingDiscovery(t, 1) } @@ -88,13 +80,28 @@ func testClusterUsingDiscovery(t *testing.T, size int) { c := NewClusterByDiscovery(t, size, dc.URL(0)+"/v2/keys") c.Launch(t) defer c.Terminate(t) + clusterMustProgress(t, c) +} - for i, u := range c.URLs() { +// clusterMustProgress ensures that cluster can make progress. It creates +// a key first, and check the new key could be got from all client urls of +// the cluster. +func clusterMustProgress(t *testing.T, cl *cluster) { + cc := mustNewHTTPClient(t, []string{cl.URL(0)}) + kapi := client.NewKeysAPI(cc) + ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) + resp, err := kapi.Create(ctx, "/foo", "bar", -1) + if err != nil { + t.Fatalf("create on %s error: %v", cl.URL(0), err) + } + cancel() + + for i, u := range cl.URLs() { cc := mustNewHTTPClient(t, []string{u}) kapi := client.NewKeysAPI(cc) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) - if _, err := kapi.Create(ctx, fmt.Sprintf("/%d", i), "bar", -1); err != nil { - t.Errorf("create on %s error: %v", u, err) + if _, err := kapi.Watch("foo", resp.Node.ModifiedIndex).Next(ctx); err != nil { + t.Fatalf("#%d: watch on %s error: %v", i, u, err) } cancel() } From 24edf57e12d44b4954a50efa2b7c96e2db617879 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Mon, 10 Nov 2014 16:46:41 -0800 Subject: [PATCH 04/27] integration: newMember -> mustNewMember --- integration/cluster_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/integration/cluster_test.go b/integration/cluster_test.go index e70d5e142..d648287a5 100644 --- a/integration/cluster_test.go +++ b/integration/cluster_test.go @@ -118,7 +118,7 @@ func NewCluster(t *testing.T, size int) *cluster { c := &cluster{} ms := make([]*member, size) for i := 0; i < size; i++ { - ms[i] = newMember(t, c.name(i)) + ms[i] = mustNewMember(t, c.name(i)) } c.Members = ms @@ -146,7 +146,7 @@ func NewClusterByDiscovery(t *testing.T, size int, url string) *cluster { c := &cluster{} ms := make([]*member, size) for i := 0; i < size; i++ { - ms[i] = newMember(t, c.name(i)) + ms[i] = mustNewMember(t, c.name(i)) ms[i].DiscoveryURL = url } c.Members = ms @@ -240,7 +240,7 @@ type member struct { hss []*httptest.Server } -func newMember(t *testing.T, name string) *member { +func mustNewMember(t *testing.T, name string) *member { var err error m := &member{} pln := newLocalListener(t) From f64963de8840f96a3fbfc07c61c3118317594785 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 10 Nov 2014 17:05:30 -0800 Subject: [PATCH 05/27] raftpb: fix proto --- raft/raftpb/raft.pb.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/raft/raftpb/raft.pb.go b/raft/raftpb/raft.pb.go index 898df719c..db726576c 100644 --- a/raft/raftpb/raft.pb.go +++ b/raft/raftpb/raft.pb.go @@ -75,7 +75,6 @@ const ( MsgVote MessageType = 5 MsgVoteResp MessageType = 6 MsgSnap MessageType = 7 - MsgDenied MessageType = 8 ) var MessageType_name = map[int32]string{ @@ -87,7 +86,6 @@ var MessageType_name = map[int32]string{ 5: "MsgVote", 6: "MsgVoteResp", 7: "MsgSnap", - 8: "MsgDenied", } var MessageType_value = map[string]int32{ "MsgHup": 0, @@ -98,7 +96,6 @@ var MessageType_value = map[string]int32{ "MsgVote": 5, "MsgVoteResp": 6, "MsgSnap": 7, - "MsgDenied": 8, } func (x MessageType) Enum() *MessageType { From 077e144e8a4abdf0cf5452d505961ea04b89c891 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 11 Nov 2014 11:02:50 -0800 Subject: [PATCH 06/27] etcdserver: move newTestMember* to member_test.go --- etcdserver/cluster_test.go | 13 ------------- etcdserver/member_test.go | 13 +++++++++++++ 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/etcdserver/cluster_test.go b/etcdserver/cluster_test.go index 9fe6ca348..8506b7929 100644 --- a/etcdserver/cluster_test.go +++ b/etcdserver/cluster_test.go @@ -613,16 +613,3 @@ func newTestCluster(membs []Member) *Cluster { } return c } - -func newTestMember(id uint64, peerURLs []string, name string, clientURLs []string) Member { - return Member{ - ID: types.ID(id), - RaftAttributes: RaftAttributes{PeerURLs: peerURLs}, - Attributes: Attributes{Name: name, ClientURLs: clientURLs}, - } -} - -func newTestMemberp(id uint64, peerURLs []string, name string, clientURLs []string) *Member { - m := newTestMember(id, peerURLs, name, clientURLs) - return &m -} diff --git a/etcdserver/member_test.go b/etcdserver/member_test.go index 1ad254d68..2e3071e8f 100644 --- a/etcdserver/member_test.go +++ b/etcdserver/member_test.go @@ -107,3 +107,16 @@ func TestMemberClone(t *testing.T) { } } } + +func newTestMember(id uint64, peerURLs []string, name string, clientURLs []string) Member { + return Member{ + ID: types.ID(id), + RaftAttributes: RaftAttributes{PeerURLs: peerURLs}, + Attributes: Attributes{Name: name, ClientURLs: clientURLs}, + } +} + +func newTestMemberp(id uint64, peerURLs []string, name string, clientURLs []string) *Member { + m := newTestMember(id, peerURLs, name, clientURLs) + return &m +} From e4931e0c470fcbe427e16f3910aa89104b70bb89 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 11 Nov 2014 11:09:33 -0800 Subject: [PATCH 07/27] etcdserver: remove unnecessary newTestMemberp --- etcdserver/cluster_test.go | 90 +++++++++++++++++++------------------- etcdserver/member_test.go | 21 ++++----- etcdserver/sender_test.go | 10 ++--- etcdserver/server_test.go | 14 +++--- 4 files changed, 65 insertions(+), 70 deletions(-) diff --git a/etcdserver/cluster_test.go b/etcdserver/cluster_test.go index 8506b7929..503bc468e 100644 --- a/etcdserver/cluster_test.go +++ b/etcdserver/cluster_test.go @@ -36,9 +36,9 @@ func TestClusterFromString(t *testing.T) { { "mem1=http://10.0.0.1:2379,mem1=http://128.193.4.20:2379,mem2=http://10.0.0.2:2379,default=http://127.0.0.1:2379", []Member{ - newTestMember(4322322643958477905, []string{"http://10.0.0.1:2379", "http://128.193.4.20:2379"}, "mem1", nil), - newTestMember(3141198903430435750, []string{"http://10.0.0.2:2379"}, "mem2", nil), - newTestMember(12762790032478827328, []string{"http://127.0.0.1:2379"}, "default", nil), + *newTestMember(4322322643958477905, []string{"http://10.0.0.1:2379", "http://128.193.4.20:2379"}, "mem1", nil), + *newTestMember(3141198903430435750, []string{"http://10.0.0.2:2379"}, "mem2", nil), + *newTestMember(12762790032478827328, []string{"http://127.0.0.1:2379"}, "default", nil), }, }, } @@ -83,15 +83,15 @@ func TestClusterFromStore(t *testing.T) { mems []Member }{ { - []Member{newTestMember(1, nil, "node1", nil)}, + []Member{*newTestMember(1, nil, "node1", nil)}, }, { []Member{}, }, { []Member{ - newTestMember(1, nil, "node1", nil), - newTestMember(2, nil, "node2", nil), + *newTestMember(1, nil, "node1", nil), + *newTestMember(2, nil, "node2", nil), }, }, } @@ -113,8 +113,8 @@ func TestClusterFromStore(t *testing.T) { func TestClusterMember(t *testing.T) { membs := []Member{ - newTestMember(1, nil, "node1", nil), - newTestMember(2, nil, "node2", nil), + *newTestMember(1, nil, "node1", nil), + *newTestMember(2, nil, "node2", nil), } tests := []struct { id types.ID @@ -138,8 +138,8 @@ func TestClusterMember(t *testing.T) { func TestClusterMemberByName(t *testing.T) { membs := []Member{ - newTestMember(1, nil, "node1", nil), - newTestMember(2, nil, "node2", nil), + *newTestMember(1, nil, "node1", nil), + *newTestMember(2, nil, "node2", nil), } tests := []struct { name string @@ -163,9 +163,9 @@ func TestClusterMemberByName(t *testing.T) { func TestClusterMemberIDs(t *testing.T) { c := newTestCluster([]Member{ - newTestMember(1, nil, "", nil), - newTestMember(4, nil, "", nil), - newTestMember(100, nil, "", nil), + *newTestMember(1, nil, "", nil), + *newTestMember(4, nil, "", nil), + *newTestMember(100, nil, "", nil), }) w := []types.ID{1, 4, 100} g := c.MemberIDs() @@ -182,7 +182,7 @@ func TestClusterPeerURLs(t *testing.T) { // single peer with a single address { mems: []Member{ - newTestMember(1, []string{"http://192.0.2.1"}, "", nil), + *newTestMember(1, []string{"http://192.0.2.1"}, "", nil), }, wurls: []string{"http://192.0.2.1"}, }, @@ -190,7 +190,7 @@ func TestClusterPeerURLs(t *testing.T) { // single peer with a single address with a port { mems: []Member{ - newTestMember(1, []string{"http://192.0.2.1:8001"}, "", nil), + *newTestMember(1, []string{"http://192.0.2.1:8001"}, "", nil), }, wurls: []string{"http://192.0.2.1:8001"}, }, @@ -198,9 +198,9 @@ func TestClusterPeerURLs(t *testing.T) { // several members explicitly unsorted { mems: []Member{ - newTestMember(2, []string{"http://192.0.2.3", "http://192.0.2.4"}, "", nil), - newTestMember(3, []string{"http://192.0.2.5", "http://192.0.2.6"}, "", nil), - newTestMember(1, []string{"http://192.0.2.1", "http://192.0.2.2"}, "", nil), + *newTestMember(2, []string{"http://192.0.2.3", "http://192.0.2.4"}, "", nil), + *newTestMember(3, []string{"http://192.0.2.5", "http://192.0.2.6"}, "", nil), + *newTestMember(1, []string{"http://192.0.2.1", "http://192.0.2.2"}, "", nil), }, wurls: []string{"http://192.0.2.1", "http://192.0.2.2", "http://192.0.2.3", "http://192.0.2.4", "http://192.0.2.5", "http://192.0.2.6"}, }, @@ -214,7 +214,7 @@ func TestClusterPeerURLs(t *testing.T) { // peer with no peer urls { mems: []Member{ - newTestMember(3, []string{}, "", nil), + *newTestMember(3, []string{}, "", nil), }, wurls: []string{}, }, @@ -237,7 +237,7 @@ func TestClusterClientURLs(t *testing.T) { // single peer with a single address { mems: []Member{ - newTestMember(1, nil, "", []string{"http://192.0.2.1"}), + *newTestMember(1, nil, "", []string{"http://192.0.2.1"}), }, wurls: []string{"http://192.0.2.1"}, }, @@ -245,7 +245,7 @@ func TestClusterClientURLs(t *testing.T) { // single peer with a single address with a port { mems: []Member{ - newTestMember(1, nil, "", []string{"http://192.0.2.1:8001"}), + *newTestMember(1, nil, "", []string{"http://192.0.2.1:8001"}), }, wurls: []string{"http://192.0.2.1:8001"}, }, @@ -253,9 +253,9 @@ func TestClusterClientURLs(t *testing.T) { // several members explicitly unsorted { mems: []Member{ - newTestMember(2, nil, "", []string{"http://192.0.2.3", "http://192.0.2.4"}), - newTestMember(3, nil, "", []string{"http://192.0.2.5", "http://192.0.2.6"}), - newTestMember(1, nil, "", []string{"http://192.0.2.1", "http://192.0.2.2"}), + *newTestMember(2, nil, "", []string{"http://192.0.2.3", "http://192.0.2.4"}), + *newTestMember(3, nil, "", []string{"http://192.0.2.5", "http://192.0.2.6"}), + *newTestMember(1, nil, "", []string{"http://192.0.2.1", "http://192.0.2.2"}), }, wurls: []string{"http://192.0.2.1", "http://192.0.2.2", "http://192.0.2.3", "http://192.0.2.4", "http://192.0.2.5", "http://192.0.2.6"}, }, @@ -269,7 +269,7 @@ func TestClusterClientURLs(t *testing.T) { // peer with no client urls { mems: []Member{ - newTestMember(3, nil, "", []string{}), + *newTestMember(3, nil, "", []string{}), }, wurls: []string{}, }, @@ -292,28 +292,28 @@ func TestClusterValidateAndAssignIDsBad(t *testing.T) { { // unmatched length []Member{ - newTestMember(1, []string{"http://127.0.0.1:2379"}, "", nil), + *newTestMember(1, []string{"http://127.0.0.1:2379"}, "", nil), }, []*Member{}, }, { // unmatched peer urls []Member{ - newTestMember(1, []string{"http://127.0.0.1:2379"}, "", nil), + *newTestMember(1, []string{"http://127.0.0.1:2379"}, "", nil), }, []*Member{ - newTestMemberp(1, []string{"http://127.0.0.1:4001"}, "", nil), + newTestMember(1, []string{"http://127.0.0.1:4001"}, "", nil), }, }, { // unmatched peer urls []Member{ - newTestMember(1, []string{"http://127.0.0.1:2379"}, "", nil), - newTestMember(2, []string{"http://127.0.0.2:2379"}, "", nil), + *newTestMember(1, []string{"http://127.0.0.1:2379"}, "", nil), + *newTestMember(2, []string{"http://127.0.0.2:2379"}, "", nil), }, []*Member{ - newTestMemberp(1, []string{"http://127.0.0.1:2379"}, "", nil), - newTestMemberp(2, []string{"http://127.0.0.2:4001"}, "", nil), + newTestMember(1, []string{"http://127.0.0.1:2379"}, "", nil), + newTestMember(2, []string{"http://127.0.0.2:4001"}, "", nil), }, }, } @@ -333,12 +333,12 @@ func TestClusterValidateAndAssignIDs(t *testing.T) { }{ { []Member{ - newTestMember(1, []string{"http://127.0.0.1:2379"}, "", nil), - newTestMember(2, []string{"http://127.0.0.2:2379"}, "", nil), + *newTestMember(1, []string{"http://127.0.0.1:2379"}, "", nil), + *newTestMember(2, []string{"http://127.0.0.2:2379"}, "", nil), }, []*Member{ - newTestMemberp(3, []string{"http://127.0.0.1:2379"}, "", nil), - newTestMemberp(4, []string{"http://127.0.0.2:2379"}, "", nil), + newTestMember(3, []string{"http://127.0.0.1:2379"}, "", nil), + newTestMember(4, []string{"http://127.0.0.2:2379"}, "", nil), }, []types.ID{3, 4}, }, @@ -427,8 +427,8 @@ func TestClusterValidateConfigurationChange(t *testing.T) { func TestClusterGenID(t *testing.T) { cs := newTestCluster([]Member{ - newTestMember(1, nil, "", nil), - newTestMember(2, nil, "", nil), + *newTestMember(1, nil, "", nil), + *newTestMember(2, nil, "", nil), }) cs.genID() @@ -438,7 +438,7 @@ func TestClusterGenID(t *testing.T) { previd := cs.ID() cs.SetStore(&storeRecorder{}) - cs.AddMember(newTestMemberp(3, nil, "", nil)) + cs.AddMember(newTestMember(3, nil, "", nil)) cs.genID() if cs.ID() == previd { t.Fatalf("cluster.ID = %v, want not %v", cs.ID(), previd) @@ -481,7 +481,7 @@ func TestClusterAddMember(t *testing.T) { st := &storeRecorder{} c := newTestCluster(nil) c.SetStore(st) - c.AddMember(newTestMemberp(1, nil, "node1", nil)) + c.AddMember(newTestMember(1, nil, "node1", nil)) wactions := []action{ { @@ -535,32 +535,32 @@ func TestClusterMembers(t *testing.T) { func TestClusterString(t *testing.T) { cls := &Cluster{ members: map[types.ID]*Member{ - 1: newTestMemberp( + 1: newTestMember( 1, []string{"http://1.1.1.1:1111", "http://0.0.0.0:0000"}, "abc", nil, ), - 2: newTestMemberp( + 2: newTestMember( 2, []string{"http://2.2.2.2:2222"}, "def", nil, ), - 3: newTestMemberp( + 3: newTestMember( 3, []string{"http://3.3.3.3:1234", "http://127.0.0.1:7001"}, "ghi", nil, ), // no PeerURLs = not included - 4: newTestMemberp( + 4: newTestMember( 4, []string{}, "four", nil, ), - 5: newTestMemberp( + 5: newTestMember( 5, nil, "five", diff --git a/etcdserver/member_test.go b/etcdserver/member_test.go index 2e3071e8f..5ec26003e 100644 --- a/etcdserver/member_test.go +++ b/etcdserver/member_test.go @@ -63,7 +63,7 @@ func TestMemberPick(t *testing.T) { urls map[string]bool }{ { - newTestMemberp(1, []string{"abc", "def", "ghi", "jkl", "mno", "pqr", "stu"}, "", nil), + newTestMember(1, []string{"abc", "def", "ghi", "jkl", "mno", "pqr", "stu"}, "", nil), map[string]bool{ "abc": true, "def": true, @@ -75,7 +75,7 @@ func TestMemberPick(t *testing.T) { }, }, { - newTestMemberp(2, []string{"xyz"}, "", nil), + newTestMember(2, []string{"xyz"}, "", nil), map[string]bool{"xyz": true}, }, } @@ -92,10 +92,10 @@ func TestMemberPick(t *testing.T) { func TestMemberClone(t *testing.T) { tests := []*Member{ - newTestMemberp(1, nil, "abc", nil), - newTestMemberp(1, []string{"http://a"}, "abc", nil), - newTestMemberp(1, nil, "abc", []string{"http://b"}), - newTestMemberp(1, []string{"http://a"}, "abc", []string{"http://b"}), + newTestMember(1, nil, "abc", nil), + newTestMember(1, []string{"http://a"}, "abc", nil), + newTestMember(1, nil, "abc", []string{"http://b"}), + newTestMember(1, []string{"http://a"}, "abc", []string{"http://b"}), } for i, tt := range tests { nm := tt.Clone() @@ -108,15 +108,10 @@ func TestMemberClone(t *testing.T) { } } -func newTestMember(id uint64, peerURLs []string, name string, clientURLs []string) Member { - return Member{ +func newTestMember(id uint64, peerURLs []string, name string, clientURLs []string) *Member { + return &Member{ ID: types.ID(id), RaftAttributes: RaftAttributes{PeerURLs: peerURLs}, Attributes: Attributes{Name: name, ClientURLs: clientURLs}, } } - -func newTestMemberp(id uint64, peerURLs []string, name string, clientURLs []string) *Member { - m := newTestMember(id, peerURLs, name, clientURLs) - return &m -} diff --git a/etcdserver/sender_test.go b/etcdserver/sender_test.go index a8b22ae45..b193c2657 100644 --- a/etcdserver/sender_test.go +++ b/etcdserver/sender_test.go @@ -25,9 +25,9 @@ import ( func TestSendHubInitSenders(t *testing.T) { membs := []Member{ - newTestMember(1, []string{"http://a"}, "", nil), - newTestMember(2, []string{"http://b"}, "", nil), - newTestMember(3, []string{"http://c"}, "", nil), + *newTestMember(1, []string{"http://a"}, "", nil), + *newTestMember(2, []string{"http://b"}, "", nil), + *newTestMember(3, []string{"http://c"}, "", nil), } cl := newTestCluster(membs) ls := stats.NewLeaderStats("") @@ -48,7 +48,7 @@ func TestSendHubAdd(t *testing.T) { cl := newTestCluster(nil) ls := stats.NewLeaderStats("") h := newSendHub(nil, cl, nil, ls) - m := newTestMemberp(1, []string{"http://a"}, "", nil) + m := newTestMember(1, []string{"http://a"}, "", nil) h.Add(m) if _, ok := ls.Followers["1"]; !ok { @@ -71,7 +71,7 @@ func TestSendHubAdd(t *testing.T) { func TestSendHubRemove(t *testing.T) { membs := []Member{ - newTestMember(1, []string{"http://a"}, "", nil), + *newTestMember(1, []string{"http://a"}, "", nil), } cl := newTestCluster(membs) ls := stats.NewLeaderStats("") diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 25fa25bc9..965791746 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -1101,25 +1101,25 @@ func TestGetOtherPeerURLs(t *testing.T) { }{ { []*Member{ - newTestMemberp(1, []string{"http://10.0.0.1"}, "a", nil), + newTestMember(1, []string{"http://10.0.0.1"}, "a", nil), }, "a", []string{}, }, { []*Member{ - newTestMemberp(1, []string{"http://10.0.0.1"}, "a", nil), - newTestMemberp(2, []string{"http://10.0.0.2"}, "b", nil), - newTestMemberp(3, []string{"http://10.0.0.3"}, "c", nil), + newTestMember(1, []string{"http://10.0.0.1"}, "a", nil), + newTestMember(2, []string{"http://10.0.0.2"}, "b", nil), + newTestMember(3, []string{"http://10.0.0.3"}, "c", nil), }, "a", []string{"http://10.0.0.2", "http://10.0.0.3"}, }, { []*Member{ - newTestMemberp(1, []string{"http://10.0.0.1"}, "a", nil), - newTestMemberp(3, []string{"http://10.0.0.3"}, "c", nil), - newTestMemberp(2, []string{"http://10.0.0.2"}, "b", nil), + newTestMember(1, []string{"http://10.0.0.1"}, "a", nil), + newTestMember(3, []string{"http://10.0.0.3"}, "c", nil), + newTestMember(2, []string{"http://10.0.0.2"}, "b", nil), }, "a", []string{"http://10.0.0.2", "http://10.0.0.3"}, From 67a0de4bbcee373dbec0d79515a2b8ec2a8502cc Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 11 Nov 2014 11:19:34 -0800 Subject: [PATCH 08/27] etcdserver: use member pointer for all tests --- etcdserver/cluster_test.go | 136 ++++++++++++++++++------------------- etcdserver/sender_test.go | 12 ++-- etcdserver/server_test.go | 4 +- 3 files changed, 75 insertions(+), 77 deletions(-) diff --git a/etcdserver/cluster_test.go b/etcdserver/cluster_test.go index 503bc468e..101d84e8d 100644 --- a/etcdserver/cluster_test.go +++ b/etcdserver/cluster_test.go @@ -31,14 +31,14 @@ import ( func TestClusterFromString(t *testing.T) { tests := []struct { f string - mems []Member + mems []*Member }{ { "mem1=http://10.0.0.1:2379,mem1=http://128.193.4.20:2379,mem2=http://10.0.0.2:2379,default=http://127.0.0.1:2379", - []Member{ - *newTestMember(4322322643958477905, []string{"http://10.0.0.1:2379", "http://128.193.4.20:2379"}, "mem1", nil), - *newTestMember(3141198903430435750, []string{"http://10.0.0.2:2379"}, "mem2", nil), - *newTestMember(12762790032478827328, []string{"http://127.0.0.1:2379"}, "default", nil), + []*Member{ + newTestMember(3141198903430435750, []string{"http://10.0.0.2:2379"}, "mem2", nil), + newTestMember(4322322643958477905, []string{"http://10.0.0.1:2379", "http://128.193.4.20:2379"}, "mem1", nil), + newTestMember(12762790032478827328, []string{"http://127.0.0.1:2379"}, "default", nil), }, }, } @@ -50,9 +50,8 @@ func TestClusterFromString(t *testing.T) { if c.token != "abc" { t.Errorf("#%d: token = %v, want abc", i, c.token) } - wc := newTestCluster(tt.mems) - if !reflect.DeepEqual(c.members, wc.members) { - t.Errorf("#%d: members = %+v, want %+v", i, c.members, wc.members) + if !reflect.DeepEqual(c.Members(), tt.mems) { + t.Errorf("#%d: members = %+v, want %+v", i, c.Members(), tt.mems) } } } @@ -80,41 +79,40 @@ func TestClusterFromStringBad(t *testing.T) { func TestClusterFromStore(t *testing.T) { tests := []struct { - mems []Member + mems []*Member }{ { - []Member{*newTestMember(1, nil, "node1", nil)}, + []*Member{newTestMember(1, nil, "node1", nil)}, }, { - []Member{}, + nil, }, { - []Member{ - *newTestMember(1, nil, "node1", nil), - *newTestMember(2, nil, "node2", nil), + []*Member{ + newTestMember(1, nil, "node1", nil), + newTestMember(2, nil, "node2", nil), }, }, } for i, tt := range tests { hc := newTestCluster(nil) for _, m := range tt.mems { - hc.AddMember(&m) + hc.AddMember(m) } c := NewClusterFromStore("abc", hc.store) if c.token != "abc" { t.Errorf("#%d: token = %v, want %v", i, c.token, "abc") } - wc := newTestCluster(tt.mems) - if !reflect.DeepEqual(c.members, wc.members) { - t.Errorf("#%d: members = %v, want %v", i, c.members, wc.members) + if !reflect.DeepEqual(c.Members(), tt.mems) { + t.Errorf("#%d: members = %v, want %v", i, c.Members(), tt.mems) } } } func TestClusterMember(t *testing.T) { - membs := []Member{ - *newTestMember(1, nil, "node1", nil), - *newTestMember(2, nil, "node2", nil), + membs := []*Member{ + newTestMember(1, nil, "node1", nil), + newTestMember(2, nil, "node2", nil), } tests := []struct { id types.ID @@ -137,9 +135,9 @@ func TestClusterMember(t *testing.T) { } func TestClusterMemberByName(t *testing.T) { - membs := []Member{ - *newTestMember(1, nil, "node1", nil), - *newTestMember(2, nil, "node2", nil), + membs := []*Member{ + newTestMember(1, nil, "node1", nil), + newTestMember(2, nil, "node2", nil), } tests := []struct { name string @@ -162,10 +160,10 @@ func TestClusterMemberByName(t *testing.T) { } func TestClusterMemberIDs(t *testing.T) { - c := newTestCluster([]Member{ - *newTestMember(1, nil, "", nil), - *newTestMember(4, nil, "", nil), - *newTestMember(100, nil, "", nil), + c := newTestCluster([]*Member{ + newTestMember(1, nil, "", nil), + newTestMember(4, nil, "", nil), + newTestMember(100, nil, "", nil), }) w := []types.ID{1, 4, 100} g := c.MemberIDs() @@ -176,45 +174,45 @@ func TestClusterMemberIDs(t *testing.T) { func TestClusterPeerURLs(t *testing.T) { tests := []struct { - mems []Member + mems []*Member wurls []string }{ // single peer with a single address { - mems: []Member{ - *newTestMember(1, []string{"http://192.0.2.1"}, "", nil), + mems: []*Member{ + newTestMember(1, []string{"http://192.0.2.1"}, "", nil), }, wurls: []string{"http://192.0.2.1"}, }, // single peer with a single address with a port { - mems: []Member{ - *newTestMember(1, []string{"http://192.0.2.1:8001"}, "", nil), + mems: []*Member{ + newTestMember(1, []string{"http://192.0.2.1:8001"}, "", nil), }, wurls: []string{"http://192.0.2.1:8001"}, }, // several members explicitly unsorted { - mems: []Member{ - *newTestMember(2, []string{"http://192.0.2.3", "http://192.0.2.4"}, "", nil), - *newTestMember(3, []string{"http://192.0.2.5", "http://192.0.2.6"}, "", nil), - *newTestMember(1, []string{"http://192.0.2.1", "http://192.0.2.2"}, "", nil), + mems: []*Member{ + newTestMember(2, []string{"http://192.0.2.3", "http://192.0.2.4"}, "", nil), + newTestMember(3, []string{"http://192.0.2.5", "http://192.0.2.6"}, "", nil), + newTestMember(1, []string{"http://192.0.2.1", "http://192.0.2.2"}, "", nil), }, wurls: []string{"http://192.0.2.1", "http://192.0.2.2", "http://192.0.2.3", "http://192.0.2.4", "http://192.0.2.5", "http://192.0.2.6"}, }, // no members { - mems: []Member{}, + mems: []*Member{}, wurls: []string{}, }, // peer with no peer urls { - mems: []Member{ - *newTestMember(3, []string{}, "", nil), + mems: []*Member{ + newTestMember(3, []string{}, "", nil), }, wurls: []string{}, }, @@ -231,45 +229,45 @@ func TestClusterPeerURLs(t *testing.T) { func TestClusterClientURLs(t *testing.T) { tests := []struct { - mems []Member + mems []*Member wurls []string }{ // single peer with a single address { - mems: []Member{ - *newTestMember(1, nil, "", []string{"http://192.0.2.1"}), + mems: []*Member{ + newTestMember(1, nil, "", []string{"http://192.0.2.1"}), }, wurls: []string{"http://192.0.2.1"}, }, // single peer with a single address with a port { - mems: []Member{ - *newTestMember(1, nil, "", []string{"http://192.0.2.1:8001"}), + mems: []*Member{ + newTestMember(1, nil, "", []string{"http://192.0.2.1:8001"}), }, wurls: []string{"http://192.0.2.1:8001"}, }, // several members explicitly unsorted { - mems: []Member{ - *newTestMember(2, nil, "", []string{"http://192.0.2.3", "http://192.0.2.4"}), - *newTestMember(3, nil, "", []string{"http://192.0.2.5", "http://192.0.2.6"}), - *newTestMember(1, nil, "", []string{"http://192.0.2.1", "http://192.0.2.2"}), + mems: []*Member{ + newTestMember(2, nil, "", []string{"http://192.0.2.3", "http://192.0.2.4"}), + newTestMember(3, nil, "", []string{"http://192.0.2.5", "http://192.0.2.6"}), + newTestMember(1, nil, "", []string{"http://192.0.2.1", "http://192.0.2.2"}), }, wurls: []string{"http://192.0.2.1", "http://192.0.2.2", "http://192.0.2.3", "http://192.0.2.4", "http://192.0.2.5", "http://192.0.2.6"}, }, // no members { - mems: []Member{}, + mems: []*Member{}, wurls: []string{}, }, // peer with no client urls { - mems: []Member{ - *newTestMember(3, nil, "", []string{}), + mems: []*Member{ + newTestMember(3, nil, "", []string{}), }, wurls: []string{}, }, @@ -286,20 +284,20 @@ func TestClusterClientURLs(t *testing.T) { func TestClusterValidateAndAssignIDsBad(t *testing.T) { tests := []struct { - clmembs []Member + clmembs []*Member membs []*Member }{ { // unmatched length - []Member{ - *newTestMember(1, []string{"http://127.0.0.1:2379"}, "", nil), + []*Member{ + newTestMember(1, []string{"http://127.0.0.1:2379"}, "", nil), }, []*Member{}, }, { // unmatched peer urls - []Member{ - *newTestMember(1, []string{"http://127.0.0.1:2379"}, "", nil), + []*Member{ + newTestMember(1, []string{"http://127.0.0.1:2379"}, "", nil), }, []*Member{ newTestMember(1, []string{"http://127.0.0.1:4001"}, "", nil), @@ -307,9 +305,9 @@ func TestClusterValidateAndAssignIDsBad(t *testing.T) { }, { // unmatched peer urls - []Member{ - *newTestMember(1, []string{"http://127.0.0.1:2379"}, "", nil), - *newTestMember(2, []string{"http://127.0.0.2:2379"}, "", nil), + []*Member{ + newTestMember(1, []string{"http://127.0.0.1:2379"}, "", nil), + newTestMember(2, []string{"http://127.0.0.2:2379"}, "", nil), }, []*Member{ newTestMember(1, []string{"http://127.0.0.1:2379"}, "", nil), @@ -327,14 +325,14 @@ func TestClusterValidateAndAssignIDsBad(t *testing.T) { func TestClusterValidateAndAssignIDs(t *testing.T) { tests := []struct { - clmembs []Member + clmembs []*Member membs []*Member wids []types.ID }{ { - []Member{ - *newTestMember(1, []string{"http://127.0.0.1:2379"}, "", nil), - *newTestMember(2, []string{"http://127.0.0.2:2379"}, "", nil), + []*Member{ + newTestMember(1, []string{"http://127.0.0.1:2379"}, "", nil), + newTestMember(2, []string{"http://127.0.0.2:2379"}, "", nil), }, []*Member{ newTestMember(3, []string{"http://127.0.0.1:2379"}, "", nil), @@ -426,9 +424,9 @@ func TestClusterValidateConfigurationChange(t *testing.T) { } func TestClusterGenID(t *testing.T) { - cs := newTestCluster([]Member{ - *newTestMember(1, nil, "", nil), - *newTestMember(2, nil, "", nil), + cs := newTestCluster([]*Member{ + newTestMember(1, nil, "", nil), + newTestMember(2, nil, "", nil), }) cs.genID() @@ -605,11 +603,11 @@ func TestNodeToMember(t *testing.T) { } } -func newTestCluster(membs []Member) *Cluster { +func newTestCluster(membs []*Member) *Cluster { c := &Cluster{members: make(map[types.ID]*Member), removed: make(map[types.ID]bool)} c.store = store.New() for i := range membs { - c.AddMember(&membs[i]) + c.AddMember(membs[i]) } return c } diff --git a/etcdserver/sender_test.go b/etcdserver/sender_test.go index b193c2657..25b928b7c 100644 --- a/etcdserver/sender_test.go +++ b/etcdserver/sender_test.go @@ -24,10 +24,10 @@ import ( ) func TestSendHubInitSenders(t *testing.T) { - membs := []Member{ - *newTestMember(1, []string{"http://a"}, "", nil), - *newTestMember(2, []string{"http://b"}, "", nil), - *newTestMember(3, []string{"http://c"}, "", nil), + membs := []*Member{ + newTestMember(1, []string{"http://a"}, "", nil), + newTestMember(2, []string{"http://b"}, "", nil), + newTestMember(3, []string{"http://c"}, "", nil), } cl := newTestCluster(membs) ls := stats.NewLeaderStats("") @@ -70,8 +70,8 @@ func TestSendHubAdd(t *testing.T) { } func TestSendHubRemove(t *testing.T) { - membs := []Member{ - *newTestMember(1, []string{"http://a"}, "", nil), + membs := []*Member{ + newTestMember(1, []string{"http://a"}, "", nil), } cl := newTestCluster(membs) ls := stats.NewLeaderStats("") diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 965791746..5361b264e 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -407,7 +407,7 @@ func TestApplyRequest(t *testing.T) { } func TestApplyRequestOnAdminMemberAttributes(t *testing.T) { - cl := newTestCluster([]Member{{ID: 1}}) + cl := newTestCluster([]*Member{{ID: 1}}) srv := &EtcdServer{ store: &storeRecorder{}, Cluster: cl, @@ -992,7 +992,7 @@ func TestRemoveMember(t *testing.T) { Nodes: []uint64{1234, 2345, 3456}, }, } - cl := newTestCluster([]Member{{ID: 1234}}) + cl := newTestCluster([]*Member{{ID: 1234}}) s := &EtcdServer{ node: n, store: &storeRecorder{}, From b6f0c789b894d1b124fa3b64d2bef5ae73aae88b Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 11 Nov 2014 11:51:57 -0800 Subject: [PATCH 09/27] transport: create a tls listener only if the tlsInfo is not empty and the scheme is HTTPS --- etcdmain/etcd.go | 6 +++--- pkg/transport/listener.go | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/etcdmain/etcd.go b/etcdmain/etcd.go index cdbe222f2..e480c600f 100644 --- a/etcdmain/etcd.go +++ b/etcdmain/etcd.go @@ -222,7 +222,7 @@ func startEtcd() error { plns := make([]net.Listener, 0) for _, u := range lpurls { var l net.Listener - l, err = transport.NewListener(u.Host, peerTLSInfo) + l, err = transport.NewListener(u.Host, u.Scheme, peerTLSInfo) if err != nil { return err } @@ -246,7 +246,7 @@ func startEtcd() error { clns := make([]net.Listener, 0) for _, u := range lcurls { var l net.Listener - l, err = transport.NewListener(u.Host, clientTLSInfo) + l, err = transport.NewListener(u.Host, u.Scheme, clientTLSInfo) if err != nil { return err } @@ -349,7 +349,7 @@ func startProxy() error { } // Start a proxy server goroutine for each listen address for _, u := range lcurls { - l, err := transport.NewListener(u.Host, clientTLSInfo) + l, err := transport.NewListener(u.Host, u.Scheme, clientTLSInfo) if err != nil { return err } diff --git a/pkg/transport/listener.go b/pkg/transport/listener.go index b4172dde7..a8d3eac5c 100644 --- a/pkg/transport/listener.go +++ b/pkg/transport/listener.go @@ -27,13 +27,13 @@ import ( "time" ) -func NewListener(addr string, info TLSInfo) (net.Listener, error) { +func NewListener(addr string, scheme string, info TLSInfo) (net.Listener, error) { l, err := net.Listen("tcp", addr) if err != nil { return nil, err } - if !info.Empty() { + if !info.Empty() && scheme == "https" { cfg, err := info.ServerConfig() if err != nil { return nil, err From 596779400900221aedac7e6d3610fb03b9274578 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 11 Nov 2014 11:46:49 -0800 Subject: [PATCH 10/27] *: support updating advertise-peer-url Users might want to update the peerurl of the etcd member in several cases. For example, if the IP address of the physical machine etcd running on is changed, user need to update the adversite-pee-rurl accordingly. This commit makes etcd support updating the advertise-peer-url of its members. --- etcdserver/cluster.go | 45 +++++- etcdserver/cluster_test.go | 55 ++++++- etcdserver/etcdhttp/client.go | 91 +++++++---- etcdserver/etcdhttp/client_test.go | 193 +++++++++++++++++++++++- etcdserver/etcdhttp/http_test.go | 3 + etcdserver/etcdhttp/httptypes/member.go | 4 + etcdserver/sender.go | 23 +++ etcdserver/server.go | 32 +++- etcdserver/server_test.go | 39 ++++- raft/node.go | 2 + raft/raftpb/raft.pb.go | 3 + raft/raftpb/raft.proto | 1 + 12 files changed, 452 insertions(+), 39 deletions(-) diff --git a/etcdserver/cluster.go b/etcdserver/cluster.go index 461488f51..362b91569 100644 --- a/etcdserver/cluster.go +++ b/etcdserver/cluster.go @@ -263,12 +263,13 @@ func (c *Cluster) SetStore(st store.Store) { c.store = st } // ensures that it is still valid. func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error { members, removed := membersFromStore(c.store) - if removed[types.ID(cc.NodeID)] { + id := types.ID(cc.NodeID) + if removed[id] { return ErrIDRemoved } switch cc.Type { case raftpb.ConfChangeAddNode: - if members[types.ID(cc.NodeID)] != nil { + if members[id] != nil { return ErrIDExists } urls := make(map[string]bool) @@ -287,11 +288,33 @@ func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error { } } case raftpb.ConfChangeRemoveNode: - if members[types.ID(cc.NodeID)] == nil { + if members[id] == nil { return ErrIDNotFound } + case raftpb.ConfChangeUpdateNode: + if members[id] == nil { + return ErrIDNotFound + } + urls := make(map[string]bool) + for _, m := range members { + if m.ID == id { + continue + } + for _, u := range m.PeerURLs { + urls[u] = true + } + } + m := new(Member) + if err := json.Unmarshal(cc.Context, m); err != nil { + log.Panicf("unmarshal member should never fail: %v", err) + } + for _, u := range m.PeerURLs { + if urls[u] { + return ErrPeerURLexists + } + } default: - log.Panicf("ConfChange type should be either AddNode or RemoveNode") + log.Panicf("ConfChange type should be either AddNode, RemoveNode or UpdateNode") } return nil } @@ -341,6 +364,20 @@ func (c *Cluster) UpdateMemberAttributes(id types.ID, attr Attributes) { c.members[id].Attributes = attr } +func (c *Cluster) UpdateMember(nm *Member) { + c.Lock() + defer c.Unlock() + b, err := json.Marshal(nm.RaftAttributes) + if err != nil { + log.Panicf("marshal raftAttributes should never fail: %v", err) + } + p := path.Join(memberStoreKey(nm.ID), raftAttributesSuffix) + if _, err := c.store.Update(p, string(b), store.Permanent); err != nil { + log.Panicf("update raftAttributes should never fail: %v", err) + } + c.members[nm.ID].RaftAttributes = nm.RaftAttributes +} + // nodeToMember builds member through a store node. // the child nodes of the given node should be sorted by key. func nodeToMember(n *store.NodeExtern) (*Member, error) { diff --git a/etcdserver/cluster_test.go b/etcdserver/cluster_test.go index 101d84e8d..08c34a9b5 100644 --- a/etcdserver/cluster_test.go +++ b/etcdserver/cluster_test.go @@ -362,7 +362,25 @@ func TestClusterValidateConfigurationChange(t *testing.T) { cl.RemoveMember(4) attr := RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 1)}} - cxt, err := json.Marshal(&Member{ID: types.ID(5), RaftAttributes: attr}) + ctx, err := json.Marshal(&Member{ID: types.ID(5), RaftAttributes: attr}) + if err != nil { + t.Fatal(err) + } + + attr = RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 5)}} + ctx5, err := json.Marshal(&Member{ID: types.ID(5), RaftAttributes: attr}) + if err != nil { + t.Fatal(err) + } + + attr = RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 3)}} + ctx2to3, err := json.Marshal(&Member{ID: types.ID(2), RaftAttributes: attr}) + if err != nil { + t.Fatal(err) + } + + attr = RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 5)}} + ctx2to5, err := json.Marshal(&Member{ID: types.ID(2), RaftAttributes: attr}) if err != nil { t.Fatal(err) } @@ -403,7 +421,7 @@ func TestClusterValidateConfigurationChange(t *testing.T) { raftpb.ConfChange{ Type: raftpb.ConfChangeAddNode, NodeID: 5, - Context: cxt, + Context: ctx, }, ErrPeerURLexists, }, @@ -414,6 +432,39 @@ func TestClusterValidateConfigurationChange(t *testing.T) { }, ErrIDNotFound, }, + { + raftpb.ConfChange{ + Type: raftpb.ConfChangeAddNode, + NodeID: 5, + Context: ctx5, + }, + nil, + }, + { + raftpb.ConfChange{ + Type: raftpb.ConfChangeUpdateNode, + NodeID: 5, + Context: ctx, + }, + ErrIDNotFound, + }, + // try to change the peer url of 2 to the peer url of 3 + { + raftpb.ConfChange{ + Type: raftpb.ConfChangeUpdateNode, + NodeID: 2, + Context: ctx2to3, + }, + ErrPeerURLexists, + }, + { + raftpb.ConfChange{ + Type: raftpb.ConfChangeUpdateNode, + NodeID: 2, + Context: ctx2to5, + }, + nil, + }, } for i, tt := range tests { err := cl.ValidateConfigurationChange(tt.cc) diff --git a/etcdserver/etcdhttp/client.go b/etcdserver/etcdhttp/client.go index 576c61431..e9e47ed7a 100644 --- a/etcdserver/etcdhttp/client.go +++ b/etcdserver/etcdhttp/client.go @@ -148,7 +148,7 @@ type membersHandler struct { } func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - if !allowMethod(w, r.Method, "GET", "POST", "DELETE") { + if !allowMethod(w, r.Method, "GET", "POST", "DELETE", "PUT") { return } w.Header().Set("X-Etcd-Cluster-ID", h.clusterInfo.ID().String()) @@ -168,25 +168,13 @@ func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { log.Printf("etcdhttp: %v", err) } case "POST": - ctype := r.Header.Get("Content-Type") - if ctype != "application/json" { - writeError(w, httptypes.NewHTTPError(http.StatusUnsupportedMediaType, fmt.Sprintf("Bad Content-Type %s, accept application/json", ctype))) - return - } - b, err := ioutil.ReadAll(r.Body) - if err != nil { - writeError(w, httptypes.NewHTTPError(http.StatusBadRequest, err.Error())) - return - } req := httptypes.MemberCreateRequest{} - if err := json.Unmarshal(b, &req); err != nil { - writeError(w, httptypes.NewHTTPError(http.StatusBadRequest, err.Error())) + if ok := unmarshalRequest(r, &req, w); !ok { return } - now := h.clock.Now() m := etcdserver.NewMember("", req.PeerURLs, "", &now) - err = h.server.AddMember(ctx, *m) + err := h.server.AddMember(ctx, *m) switch { case err == etcdserver.ErrIDExists || err == etcdserver.ErrPeerURLexists: writeError(w, httptypes.NewHTTPError(http.StatusConflict, err.Error())) @@ -203,28 +191,47 @@ func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { log.Printf("etcdhttp: %v", err) } case "DELETE": - idStr := trimPrefix(r.URL.Path, membersPrefix) - if idStr == "" { - http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) + id, ok := getID(r.URL.Path, w) + if !ok { return } - id, err := types.IDFromString(idStr) - if err != nil { - writeError(w, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", idStr))) - return - } - err = h.server.RemoveMember(ctx, uint64(id)) + err := h.server.RemoveMember(ctx, uint64(id)) switch { case err == etcdserver.ErrIDRemoved: - writeError(w, httptypes.NewHTTPError(http.StatusGone, fmt.Sprintf("Member permanently removed: %s", idStr))) + writeError(w, httptypes.NewHTTPError(http.StatusGone, fmt.Sprintf("Member permanently removed: %s", id))) case err == etcdserver.ErrIDNotFound: - writeError(w, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", idStr))) + writeError(w, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", id))) case err != nil: log.Printf("etcdhttp: error removing node %s: %v", id, err) writeError(w, err) default: w.WriteHeader(http.StatusNoContent) } + case "PUT": + id, ok := getID(r.URL.Path, w) + if !ok { + return + } + req := httptypes.MemberUpdateRequest{} + if ok := unmarshalRequest(r, &req, w); !ok { + return + } + m := etcdserver.Member{ + ID: id, + RaftAttributes: etcdserver.RaftAttributes{PeerURLs: req.PeerURLs.StringSlice()}, + } + err := h.server.UpdateMember(ctx, m) + switch { + case err == etcdserver.ErrPeerURLexists: + writeError(w, httptypes.NewHTTPError(http.StatusConflict, err.Error())) + case err == etcdserver.ErrIDNotFound: + writeError(w, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", id))) + case err != nil: + log.Printf("etcdhttp: error updating node %s: %v", m.ID, err) + writeError(w, err) + default: + w.WriteHeader(http.StatusNoContent) + } } } @@ -506,6 +513,38 @@ func trimErrorPrefix(err error, prefix string) error { return err } +func unmarshalRequest(r *http.Request, req json.Unmarshaler, w http.ResponseWriter) bool { + ctype := r.Header.Get("Content-Type") + if ctype != "application/json" { + writeError(w, httptypes.NewHTTPError(http.StatusUnsupportedMediaType, fmt.Sprintf("Bad Content-Type %s, accept application/json", ctype))) + return false + } + b, err := ioutil.ReadAll(r.Body) + if err != nil { + writeError(w, httptypes.NewHTTPError(http.StatusBadRequest, err.Error())) + return false + } + if err := req.UnmarshalJSON(b); err != nil { + writeError(w, httptypes.NewHTTPError(http.StatusBadRequest, err.Error())) + return false + } + return true +} + +func getID(p string, w http.ResponseWriter) (types.ID, bool) { + idStr := trimPrefix(p, membersPrefix) + if idStr == "" { + http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) + return 0, false + } + id, err := types.IDFromString(idStr) + if err != nil { + writeError(w, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", idStr))) + return 0, false + } + return id, true +} + // getUint64 extracts a uint64 by the given key from a Form. If the key does // not exist in the form, 0 is returned. If the key exists but the value is // badly formed, an error is returned. If multiple values are present only the diff --git a/etcdserver/etcdhttp/client_test.go b/etcdserver/etcdhttp/client_test.go index 78e0a4e0a..bf4f21e6c 100644 --- a/etcdserver/etcdhttp/client_test.go +++ b/etcdserver/etcdhttp/client_test.go @@ -111,6 +111,11 @@ func (s *serverRecorder) RemoveMember(_ context.Context, id uint64) error { return nil } +func (s *serverRecorder) UpdateMember(_ context.Context, m etcdserver.Member) error { + s.actions = append(s.actions, action{name: "UpdateMember", params: []interface{}{m}}) + return nil +} + type action struct { name string params []interface{} @@ -136,11 +141,12 @@ type resServer struct { func (rs *resServer) Do(_ context.Context, _ etcdserverpb.Request) (etcdserver.Response, error) { return rs.res, nil } -func (rs *resServer) Process(_ context.Context, _ raftpb.Message) error { return nil } -func (rs *resServer) Start() {} -func (rs *resServer) Stop() {} -func (rs *resServer) AddMember(_ context.Context, _ etcdserver.Member) error { return nil } -func (rs *resServer) RemoveMember(_ context.Context, _ uint64) error { return nil } +func (rs *resServer) Process(_ context.Context, _ raftpb.Message) error { return nil } +func (rs *resServer) Start() {} +func (rs *resServer) Stop() {} +func (rs *resServer) AddMember(_ context.Context, _ etcdserver.Member) error { return nil } +func (rs *resServer) RemoveMember(_ context.Context, _ uint64) error { return nil } +func (rs *resServer) UpdateMember(_ context.Context, _ etcdserver.Member) error { return nil } func boolp(b bool) *bool { return &b } @@ -698,6 +704,48 @@ func TestServeMembersDelete(t *testing.T) { } } +func TestServeMembersUpdate(t *testing.T) { + u := mustNewURL(t, path.Join(membersPrefix, "1")) + b := []byte(`{"peerURLs":["http://127.0.0.1:1"]}`) + req, err := http.NewRequest("PUT", u.String(), bytes.NewReader(b)) + if err != nil { + t.Fatal(err) + } + req.Header.Set("Content-Type", "application/json") + s := &serverRecorder{} + h := &membersHandler{ + server: s, + clock: clockwork.NewFakeClock(), + clusterInfo: &fakeCluster{id: 1}, + } + rw := httptest.NewRecorder() + + h.ServeHTTP(rw, req) + + wcode := http.StatusNoContent + if rw.Code != wcode { + t.Errorf("code=%d, want %d", rw.Code, wcode) + } + + gcid := rw.Header().Get("X-Etcd-Cluster-ID") + wcid := h.clusterInfo.ID().String() + if gcid != wcid { + t.Errorf("cid = %s, want %s", gcid, wcid) + } + + wm := etcdserver.Member{ + ID: 1, + RaftAttributes: etcdserver.RaftAttributes{ + PeerURLs: []string{"http://127.0.0.1:1"}, + }, + } + + wactions := []action{{name: "UpdateMember", params: []interface{}{wm}}} + if !reflect.DeepEqual(s.actions, wactions) { + t.Errorf("actions = %+v, want %+v", s.actions, wactions) + } +} + func TestServeMembersFail(t *testing.T) { tests := []struct { req *http.Request @@ -855,6 +903,104 @@ func TestServeMembersFail(t *testing.T) { }, nil, + http.StatusMethodNotAllowed, + }, + { + // parse body error + &http.Request{ + URL: mustNewURL(t, path.Join(membersPrefix, "0")), + Method: "PUT", + Body: ioutil.NopCloser(strings.NewReader("bad json")), + Header: map[string][]string{"Content-Type": []string{"application/json"}}, + }, + &resServer{}, + + http.StatusBadRequest, + }, + { + // bad content type + &http.Request{ + URL: mustNewURL(t, path.Join(membersPrefix, "0")), + Method: "PUT", + Body: ioutil.NopCloser(strings.NewReader(`{"PeerURLs": ["http://127.0.0.1:1"]}`)), + Header: map[string][]string{"Content-Type": []string{"application/bad"}}, + }, + &errServer{}, + + http.StatusUnsupportedMediaType, + }, + { + // bad url + &http.Request{ + URL: mustNewURL(t, path.Join(membersPrefix, "0")), + Method: "PUT", + Body: ioutil.NopCloser(strings.NewReader(`{"PeerURLs": ["http://a"]}`)), + Header: map[string][]string{"Content-Type": []string{"application/json"}}, + }, + &errServer{}, + + http.StatusBadRequest, + }, + { + // etcdserver.UpdateMember error + &http.Request{ + URL: mustNewURL(t, path.Join(membersPrefix, "0")), + Method: "PUT", + Body: ioutil.NopCloser(strings.NewReader(`{"PeerURLs": ["http://127.0.0.1:1"]}`)), + Header: map[string][]string{"Content-Type": []string{"application/json"}}, + }, + &errServer{ + errors.New("blah"), + }, + + http.StatusInternalServerError, + }, + { + // etcdserver.UpdateMember error + &http.Request{ + URL: mustNewURL(t, path.Join(membersPrefix, "0")), + Method: "PUT", + Body: ioutil.NopCloser(strings.NewReader(`{"PeerURLs": ["http://127.0.0.1:1"]}`)), + Header: map[string][]string{"Content-Type": []string{"application/json"}}, + }, + &errServer{ + etcdserver.ErrPeerURLexists, + }, + + http.StatusConflict, + }, + { + // etcdserver.UpdateMember error + &http.Request{ + URL: mustNewURL(t, path.Join(membersPrefix, "0")), + Method: "PUT", + Body: ioutil.NopCloser(strings.NewReader(`{"PeerURLs": ["http://127.0.0.1:1"]}`)), + Header: map[string][]string{"Content-Type": []string{"application/json"}}, + }, + &errServer{ + etcdserver.ErrIDNotFound, + }, + + http.StatusNotFound, + }, + { + // etcdserver.UpdateMember error with badly formed ID + &http.Request{ + URL: mustNewURL(t, path.Join(membersPrefix, "bad_id")), + Method: "PUT", + }, + nil, + + http.StatusNotFound, + }, + { + // etcdserver.UpdateMember with no ID + &http.Request{ + URL: mustNewURL(t, membersPrefix), + Method: "PUT", + }, + nil, + http.StatusMethodNotAllowed, }, } @@ -995,6 +1141,43 @@ func TestServeMachines(t *testing.T) { } } +func TestGetID(t *testing.T) { + tests := []struct { + path string + + wok bool + wid types.ID + wcode int + }{ + { + "123", + true, 0x123, http.StatusOK, + }, + { + "bad_id", + false, 0, http.StatusNotFound, + }, + { + "", + false, 0, http.StatusMethodNotAllowed, + }, + } + + for i, tt := range tests { + w := httptest.NewRecorder() + id, ok := getID(tt.path, w) + if id != tt.wid { + t.Errorf("#%d: id = %d, want %d", i, id, tt.wid) + } + if ok != tt.wok { + t.Errorf("#%d: ok = %t, want %t", i, ok, tt.wok) + } + if w.Code != tt.wcode { + t.Errorf("#%d code = %d, want %d", i, w.Code, tt.wcode) + } + } +} + type dummyStats struct { data []byte } diff --git a/etcdserver/etcdhttp/http_test.go b/etcdserver/etcdhttp/http_test.go index 476a10725..9813ed47d 100644 --- a/etcdserver/etcdhttp/http_test.go +++ b/etcdserver/etcdhttp/http_test.go @@ -79,6 +79,9 @@ func (fs *errServer) AddMember(ctx context.Context, m etcdserver.Member) error { func (fs *errServer) RemoveMember(ctx context.Context, id uint64) error { return fs.err } +func (fs *errServer) UpdateMember(ctx context.Context, m etcdserver.Member) error { + return fs.err +} func TestWriteError(t *testing.T) { // nil error should not panic diff --git a/etcdserver/etcdhttp/httptypes/member.go b/etcdserver/etcdhttp/httptypes/member.go index 6d8554ed0..a71457bcb 100644 --- a/etcdserver/etcdhttp/httptypes/member.go +++ b/etcdserver/etcdhttp/httptypes/member.go @@ -33,6 +33,10 @@ type MemberCreateRequest struct { PeerURLs types.URLs } +type MemberUpdateRequest struct { + MemberCreateRequest +} + func (m *MemberCreateRequest) MarshalJSON() ([]byte, error) { s := struct { PeerURLs []string `json:"peerURLs"` diff --git a/etcdserver/sender.go b/etcdserver/sender.go index dbe846f76..b2512057c 100644 --- a/etcdserver/sender.go +++ b/etcdserver/sender.go @@ -21,6 +21,9 @@ import ( "fmt" "log" "net/http" + "net/url" + "path" + "sync" "time" "github.com/coreos/etcd/etcdserver/stats" @@ -108,12 +111,30 @@ func (h *sendHub) Remove(id types.ID) { delete(h.senders, id) } +func (h *sendHub) Update(m *Member) { + // TODO: return error or just panic? + if _, ok := h.senders[m.ID]; !ok { + return + } + peerURL := m.PickPeerURL() + u, err := url.Parse(peerURL) + if err != nil { + log.Panicf("unexpect peer url %s", peerURL) + } + u.Path = path.Join(u.Path, raftPrefix) + s := h.senders[m.ID] + s.mu.Lock() + defer s.mu.Unlock() + s.u = u.String() +} + type sender struct { u string cid types.ID c *http.Client fs *stats.FollowerStats q chan []byte + mu sync.RWMutex } func newSender(u string, cid types.ID, c *http.Client, fs *stats.FollowerStats) *sender { @@ -159,7 +180,9 @@ func (s *sender) handle() { // post POSTs a data payload to a url. Returns nil if the POST succeeds, // error on any failure. func (s *sender) post(data []byte) error { + s.mu.RLock() req, err := http.NewRequest("POST", s.u, bytes.NewBuffer(data)) + s.mu.RUnlock() if err != nil { return fmt.Errorf("new request to %s error: %v", s.u, err) } diff --git a/etcdserver/server.go b/etcdserver/server.go index a33f3c227..19072c765 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -89,6 +89,7 @@ type Sender interface { Send(m []raftpb.Message) Add(m *Member) Remove(id types.ID) + Update(m *Member) Stop() } @@ -114,7 +115,7 @@ type Server interface { // Stop terminates the Server and performs any necessary finalization. // Do and Process cannot be called after Stop has been invoked. Stop() - // Do takes a request and attempts to fulfil it, returning a Response. + // Do takes a request and attempts to fulfill it, returning a Response. Do(ctx context.Context, r pb.Request) (Response, error) // Process takes a raft message and applies it to the server's raft state // machine, respecting any timeout of the given context. @@ -127,6 +128,10 @@ type Server interface { // return ErrIDRemoved if member ID is removed from the cluster, or return // ErrIDNotFound if member ID is not in the cluster. RemoveMember(ctx context.Context, id uint64) error + + // UpdateMember attempts to update a existing member in the cluster. It will + // return ErrIDNotFound if the member ID does not exist. + UpdateMember(ctx context.Context, updateMemb Member) error } type Stats interface { @@ -475,6 +480,20 @@ func (s *EtcdServer) RemoveMember(ctx context.Context, id uint64) error { return s.configure(ctx, cc) } +func (s *EtcdServer) UpdateMember(ctx context.Context, memb Member) error { + b, err := json.Marshal(memb) + if err != nil { + return err + } + cc := raftpb.ConfChange{ + ID: GenID(), + Type: raftpb.ConfChangeUpdateNode, + NodeID: uint64(memb.ID), + Context: b, + } + return s.configure(ctx, cc) +} + // Implement the RaftTimer interface func (s *EtcdServer) Index() uint64 { return atomic.LoadUint64(&s.raftIndex) @@ -672,6 +691,17 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) error { s.Cluster.RemoveMember(id) s.sender.Remove(id) log.Printf("etcdserver: removed node %s from cluster %s", id, s.Cluster.ID()) + case raftpb.ConfChangeUpdateNode: + m := new(Member) + if err := json.Unmarshal(cc.Context, m); err != nil { + log.Panicf("unmarshal member should never fail: %v", err) + } + if cc.NodeID != uint64(m.ID) { + log.Panicf("nodeID should always be equal to member ID") + } + s.Cluster.UpdateMember(m) + s.sender.Update(m) + log.Printf("etcdserver: update node %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID()) } return nil } diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 5361b264e..566e8cc21 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -447,7 +447,7 @@ func TestApplyConfChangeError(t *testing.T) { }, { raftpb.ConfChange{ - Type: raftpb.ConfChangeRemoveNode, + Type: raftpb.ConfChangeUpdateNode, NodeID: 4, }, ErrIDRemoved, @@ -503,6 +503,7 @@ func (s *fakeSender) Send(msgs []raftpb.Message) { } } func (s *fakeSender) Add(m *Member) {} +func (s *fakeSender) Update(m *Member) {} func (s *fakeSender) Remove(id types.ID) {} func (s *fakeSender) Stop() {} @@ -1017,6 +1018,41 @@ func TestRemoveMember(t *testing.T) { } } +// TestUpdateMember tests RemoveMember can propose and perform node update. +func TestUpdateMember(t *testing.T) { + n := newNodeConfChangeCommitterRecorder() + n.readyc <- raft.Ready{ + SoftState: &raft.SoftState{ + RaftState: raft.StateLeader, + Nodes: []uint64{1234, 2345, 3456}, + }, + } + cl := newTestCluster([]*Member{{ID: 1234}}) + s := &EtcdServer{ + node: n, + store: &storeRecorder{}, + sender: &nopSender{}, + storage: &storageRecorder{}, + Cluster: cl, + } + s.start() + wm := Member{ID: 1234, RaftAttributes: RaftAttributes{PeerURLs: []string{"http://127.0.0.1:1"}}} + err := s.UpdateMember(context.TODO(), wm) + gaction := n.Action() + s.Stop() + + if err != nil { + t.Fatalf("UpdateMember error: %v", err) + } + wactions := []action{action{name: "ProposeConfChange:ConfChangeUpdateNode"}, action{name: "ApplyConfChange:ConfChangeUpdateNode"}} + if !reflect.DeepEqual(gaction, wactions) { + t.Errorf("action = %v, want %v", gaction, wactions) + } + if !reflect.DeepEqual(cl.Member(1234), &wm) { + t.Errorf("member = %v, want %v", cl.Member(1234), &wm) + } +} + // TODO: test server could stop itself when being removed // TODO: test wait trigger correctness in multi-server case @@ -1446,6 +1482,7 @@ type nopSender struct{} func (s *nopSender) Send(m []raftpb.Message) {} func (s *nopSender) Add(m *Member) {} func (s *nopSender) Remove(id types.ID) {} +func (s *nopSender) Update(m *Member) {} func (s *nopSender) Stop() {} func mustMakePeerSlice(t *testing.T, ids ...uint64) []raft.Peer { diff --git a/raft/node.go b/raft/node.go index df299c56a..a6f269826 100644 --- a/raft/node.go +++ b/raft/node.go @@ -271,6 +271,8 @@ func (n *node) run(r *raft) { r.addNode(cc.NodeID) case pb.ConfChangeRemoveNode: r.removeNode(cc.NodeID) + case pb.ConfChangeUpdateNode: + r.resetPendingConf() default: panic("unexpected conf type") } diff --git a/raft/raftpb/raft.pb.go b/raft/raftpb/raft.pb.go index db726576c..03bdb8c83 100644 --- a/raft/raftpb/raft.pb.go +++ b/raft/raftpb/raft.pb.go @@ -120,15 +120,18 @@ type ConfChangeType int32 const ( ConfChangeAddNode ConfChangeType = 0 ConfChangeRemoveNode ConfChangeType = 1 + ConfChangeUpdateNode ConfChangeType = 2 ) var ConfChangeType_name = map[int32]string{ 0: "ConfChangeAddNode", 1: "ConfChangeRemoveNode", + 2: "ConfChangeUpdateNode", } var ConfChangeType_value = map[string]int32{ "ConfChangeAddNode": 0, "ConfChangeRemoveNode": 1, + "ConfChangeUpdateNode": 2, } func (x ConfChangeType) Enum() *ConfChangeType { diff --git a/raft/raftpb/raft.proto b/raft/raftpb/raft.proto index 47c9ec2ef..7b60393b8 100644 --- a/raft/raftpb/raft.proto +++ b/raft/raftpb/raft.proto @@ -60,6 +60,7 @@ message HardState { enum ConfChangeType { ConfChangeAddNode = 0; ConfChangeRemoveNode = 1; + ConfChangeUpdateNode = 2; } message ConfChange { From 7dba92dd534af6afbf083fa0b0b33d41dc205541 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Tue, 11 Nov 2014 16:10:03 -0800 Subject: [PATCH 11/27] raft: update unstable when calling stableTo with 0 It should update unstable in this case because it may happen that raft only writes entry 0 into stable storage. --- raft/log.go | 3 --- raft/log_test.go | 23 ++++++++++++++++++++++- raft/node.go | 4 +--- 3 files changed, 23 insertions(+), 7 deletions(-) diff --git a/raft/log.go b/raft/log.go index 496a1043c..49666c80c 100644 --- a/raft/log.go +++ b/raft/log.go @@ -134,9 +134,6 @@ func (l *raftLog) appliedTo(i uint64) { } func (l *raftLog) stableTo(i uint64) { - if i == 0 { - return - } l.unstable = i + 1 } diff --git a/raft/log_test.go b/raft/log_test.go index 9ea3aba51..9654d76aa 100644 --- a/raft/log_test.go +++ b/raft/log_test.go @@ -335,6 +335,7 @@ func TestUnstableEnts(t *testing.T) { }{ {3, nil, 3}, {1, previousEnts, 3}, + {0, append([]pb.Entry{{}}, previousEnts...), 3}, } for i, tt := range tests { @@ -342,7 +343,9 @@ func TestUnstableEnts(t *testing.T) { raftLog.append(0, previousEnts...) raftLog.unstable = tt.unstable ents := raftLog.unstableEnts() - raftLog.stableTo(raftLog.lastIndex()) + if l := len(ents); l > 0 { + raftLog.stableTo(ents[l-1].Index) + } if !reflect.DeepEqual(ents, tt.wents) { t.Errorf("#%d: unstableEnts = %+v, want %+v", i, ents, tt.wents) } @@ -352,6 +355,24 @@ func TestUnstableEnts(t *testing.T) { } } +func TestStableTo(t *testing.T) { + tests := []struct { + stable uint64 + wunstable uint64 + }{ + {0, 1}, + {1, 2}, + {2, 3}, + } + for i, tt := range tests { + raftLog := newLog() + raftLog.stableTo(tt.stable) + if raftLog.unstable != tt.wunstable { + t.Errorf("#%d: unstable = %d, want %d", i, raftLog.unstable, tt.wunstable) + } + } +} + //TestCompaction ensures that the number of log entreis is correct after compactions. func TestCompaction(t *testing.T) { tests := []struct { diff --git a/raft/node.go b/raft/node.go index df299c56a..c1b0ce8c7 100644 --- a/raft/node.go +++ b/raft/node.go @@ -298,9 +298,7 @@ func (n *node) run(r *raft) { if prevHardSt.Commit != 0 { r.raftLog.appliedTo(prevHardSt.Commit) } - if prevLastUnstablei != 0 { - r.raftLog.stableTo(prevLastUnstablei) - } + r.raftLog.stableTo(prevLastUnstablei) advancec = nil case <-n.done: return From 78cbb1512cc1fd5f8859251d0c7ad6c8d04973ed Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Tue, 11 Nov 2014 21:11:09 -0800 Subject: [PATCH 12/27] raft: nodes return sorted ids This makes raft.softState return the same result when its soft state is not changed. --- raft/raft.go | 1 + raft/raft_test.go | 24 ++++++++++++++++++++++-- 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/raft/raft.go b/raft/raft.go index 3babd1f26..376f98468 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -531,6 +531,7 @@ func (r *raft) nodes() []uint64 { for k := range r.prs { nodes = append(nodes, k) } + sort.Sort(uint64Slice(nodes)) return nodes } diff --git a/raft/raft_test.go b/raft/raft_test.go index 2acf2e636..5677e36dc 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -997,7 +997,6 @@ func TestRestore(t *testing.T) { t.Errorf("log.lastTerm = %d, want %d", sm.raftLog.term(s.Index), s.Term) } sg := sm.nodes() - sort.Sort(uint64Slice(sg)) if !reflect.DeepEqual(sg, s.Nodes) { t.Errorf("sm.Nodes = %+v, want %+v", sg, s.Nodes) } @@ -1166,7 +1165,6 @@ func TestAddNode(t *testing.T) { t.Errorf("pendingConf = %v, want false", r.pendingConf) } nodes := r.nodes() - sort.Sort(uint64Slice(nodes)) wnodes := []uint64{1, 2} if !reflect.DeepEqual(nodes, wnodes) { t.Errorf("nodes = %v, want %v", nodes, wnodes) @@ -1210,6 +1208,28 @@ func TestPromotable(t *testing.T) { } } +func TestRaftNodes(t *testing.T) { + tests := []struct { + ids []uint64 + wids []uint64 + }{ + { + []uint64{1, 2, 3}, + []uint64{1, 2, 3}, + }, + { + []uint64{3, 2, 1}, + []uint64{1, 2, 3}, + }, + } + for i, tt := range tests { + r := newRaft(1, tt.ids, 10, 1) + if !reflect.DeepEqual(r.nodes(), tt.wids) { + t.Errorf("#%d: nodes = %+v, want %+v", i, r.nodes(), tt.wids) + } + } +} + func ents(terms ...uint64) *raft { ents := []pb.Entry{{}} for _, term := range terms { From 0aa8258d29cdbe295fd39de0450fdeba51da3cc2 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 12 Nov 2014 10:45:35 -0800 Subject: [PATCH 13/27] etcdserver: use member instead of node at etcd level --- etcdserver/server.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/etcdserver/server.go b/etcdserver/server.go index 19072c765..1794f92b1 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -685,12 +685,12 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) error { } s.Cluster.AddMember(m) s.sender.Add(m) - log.Printf("etcdserver: added node %s %v to cluster %s", types.ID(cc.NodeID), m.PeerURLs, s.Cluster.ID()) + log.Printf("etcdserver: added member %s %v to cluster %s", types.ID(cc.NodeID), m.PeerURLs, s.Cluster.ID()) case raftpb.ConfChangeRemoveNode: id := types.ID(cc.NodeID) s.Cluster.RemoveMember(id) s.sender.Remove(id) - log.Printf("etcdserver: removed node %s from cluster %s", id, s.Cluster.ID()) + log.Printf("etcdserver: removed member %s from cluster %s", id, s.Cluster.ID()) case raftpb.ConfChangeUpdateNode: m := new(Member) if err := json.Unmarshal(cc.Context, m); err != nil { @@ -701,7 +701,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) error { } s.Cluster.UpdateMember(m) s.sender.Update(m) - log.Printf("etcdserver: update node %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID()) + log.Printf("etcdserver: update member %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID()) } return nil } @@ -773,7 +773,7 @@ func startNode(cfg *ServerConfig, ids []types.ID) (id types.ID, n raft.Node, w * peers[i] = raft.Peer{ID: uint64(id), Context: ctx} } id = member.ID - log.Printf("etcdserver: start node %s in cluster %s", id, cfg.Cluster.ID()) + log.Printf("etcdserver: start member %s in cluster %s", id, cfg.Cluster.ID()) n = raft.StartNode(uint64(id), peers, 10, 1) return } From fe0325fce7d7b9dbc5ddca8638cdfeef6c8419ca Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 12 Nov 2014 11:05:25 -0800 Subject: [PATCH 14/27] raft: add comment string for TestNodeStart --- raft/node.go | 2 +- raft/node_test.go | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/raft/node.go b/raft/node.go index 003db937f..b4e2e6f51 100644 --- a/raft/node.go +++ b/raft/node.go @@ -143,7 +143,7 @@ type Peer struct { // StartNode returns a new Node given a unique raft id, a list of raft peers, and // the election and heartbeat timeouts in units of ticks. -// It also builds ConfChangeAddNode entry for each peer and puts them at the head of the log. +// It appends a ConfChangeAddNode entry for each given peer to the initial log. func StartNode(id uint64, peers []Peer, election, heartbeat int) Node { n := newNode() r := newRaft(id, nil, election, heartbeat) diff --git a/raft/node_test.go b/raft/node_test.go index e25513f67..a101e230a 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -161,7 +161,10 @@ func TestReadyContainUpdates(t *testing.T) { } } -func TestNode(t *testing.T) { +// TestNodeStart ensures that a node can be started correctly. The node should +// start with correct configuration change entries, and can accept and commit +// proposals. +func TestNodeStart(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() From 45c36a08086a8a5caf5b87c63e4695af11daa145 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 12 Nov 2014 11:39:22 -0800 Subject: [PATCH 15/27] raft: add a test for node.Tick --- raft/node_test.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/raft/node_test.go b/raft/node_test.go index e25513f67..20e0fd4a4 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -140,6 +140,20 @@ func TestBlockProposal(t *testing.T) { } } +// TestNodeTick ensures that node.Tick() will increase the +// elapsed of the underly raft state machine. +func TestNodeTick(t *testing.T) { + n := newNode() + r := newRaft(1, []uint64{1}, 10, 1) + go n.run(r) + elapsed := r.elapsed + n.Tick() + n.Stop() + if r.elapsed != elapsed+1 { + t.Errorf("elapsed = %d, want %d", r.elapsed, elapsed+1) + } +} + func TestReadyContainUpdates(t *testing.T) { tests := []struct { rd Ready From 3f358b6d5de7a2286defab37c887dd1da4cef8a4 Mon Sep 17 00:00:00 2001 From: Jonathan Boulle Date: Mon, 10 Nov 2014 12:55:35 -0800 Subject: [PATCH 16/27] etcdserver: ensure initial-advertise-peer-urls match initial-cluster This adds a check to setupCluster to ensure that the list of URLs specified in `initial-advertise-peer-urls` matches those configured in `initial-cluster` for this node. Also updates the documentation to clarify this and address some changes in wording. --- Documentation/0.5/clustering.md | 30 +++++++++------ etcdmain/etcd.go | 29 +++++++++++++- etcdmain/etcd_test.go | 67 +++++++++++++++++++++++++++++++-- etcdserver/cluster.go | 12 +++++- etcdserver/config.go | 2 +- 5 files changed, 119 insertions(+), 21 deletions(-) diff --git a/Documentation/0.5/clustering.md b/Documentation/0.5/clustering.md index 6a8c3eb69..d088dd9ec 100644 --- a/Documentation/0.5/clustering.md +++ b/Documentation/0.5/clustering.md @@ -10,7 +10,7 @@ This guide will walk you through configuring a three machine etcd cluster with t ## Static -As we know the cluster members, their addresses and the size of the cluster before starting we can use an offline bootstrap configuration. Each machine will get either the following command line or environment variables: +As we know the cluster members, their addresses and the size of the cluster before starting, we can use an offline bootstrap configuration by setting the `initial-cluster` flag. Each machine will get either the following command line or environment variables: ``` ETCD_INITIAL_CLUSTER="infra0=http://10.0.1.10:2380,infra1=http://10.0.1.11:2380,infra2=http://10.0.1.12:2380" @@ -22,6 +22,8 @@ ETCD_INITIAL_CLUSTER_STATE=new -initial-cluster-state new ``` +Note that the URLs specified in `initial-cluster` are the _advertised peer URLs_, i.e. they should match the value of `initial-advertise-peer-urls` on the respective nodes. + If you are spinning up multiple clusters (or creating and destroying a single cluster) with same configuration for testing purpose, it is highly recommended that you specify a unique `initial-cluster-token` for the different clusters. By doing this, etcd can generate unique cluster IDs and member IDs for the clusters even if they otherwise have the exact same configuration. This can protect you from cross-cluster-interaction, which might corrupt your clusters. On each machine you would start etcd with these flags: @@ -45,11 +47,11 @@ $ etcd -name infra2 -initial-advertise-peer-urls https://10.0.1.12:2380 \ -initial-cluster-state new ``` -The command line parameters starting with `-initial-cluster` will be ignored on subsequent runs of etcd. You are free to remove the environment variables or command line flags after the initial bootstrap process. If you need to make changes to the configuration later see our guide on [runtime configuration](runtime-configuration.md). +The command line parameters starting with `-initial-cluster` will be ignored on subsequent runs of etcd. You are free to remove the environment variables or command line flags after the initial bootstrap process. If you need to make changes to the configuration later (for example, adding or removing members to/from the cluster), see the [runtime configuration](runtime-configuration.md) guide. ### Error Cases -In the following case we have not included our new host in the list of enumerated nodes. If this is a new cluster, the node must be added to the list of initial cluster members. +In the following example, we have not included our new host in the list of enumerated nodes. If this is a new cluster, the node _must_ be added to the list of initial cluster members. ``` $ etcd -name infra1 -initial-advertise-peer-urls http://10.0.1.11:2380 \ @@ -59,13 +61,13 @@ etcd: infra1 not listed in the initial cluster config exit 1 ``` -In this case we are attempting to map a node (infra0) on a different address (127.0.0.1:2380) than its enumerated address in the cluster list (10.0.1.10:2380). If this node is to listen on multiple addresses, all addresses must be reflected in the "initial-cluster" configuration directive. +In this example, we are attempting to map a node (infra0) on a different address (127.0.0.1:2380) than its enumerated address in the cluster list (10.0.1.10:2380). If this node is to listen on multiple addresses, all addresses _must_ be reflected in the "initial-cluster" configuration directive. ``` $ etcd -name infra0 -initial-advertise-peer-urls http://127.0.0.1:2380 \ -initial-cluster infra0=http://10.0.1.10:2380,infra1=http://10.0.1.11:2380,infra2=http://10.0.1.12:2380 \ -initial-cluster-state=new -etcd: infra0 has different advertised URLs in the cluster and advertised peer URLs list +etcd: error setting up initial cluster: infra0 has different advertised URLs in the cluster and advertised peer URLs list exit 1 ``` @@ -81,7 +83,7 @@ exit 1 ## Discovery -In a number of cases you might not know the IPs of your cluster peers ahead of time. This is common when utilizing cloud providers or when your network uses DHCP. In these cases you can use an existing etcd cluster to bootstrap a new one. We call this process "discovery". +In a number of cases, you might not know the IPs of your cluster peers ahead of time. This is common when utilizing cloud providers or when your network uses DHCP. In these cases, rather than specifying a static configuration, you can use an existing etcd cluster to bootstrap a new one. We call this process "discovery". ### Lifetime of a Discovery URL @@ -99,7 +101,7 @@ Discovery uses an existing cluster to bootstrap itself. If you are using your ow $ curl -X PUT https://myetcd.local/v2/keys/discovery/6c007a14875d53d9bf0ef5a6fc0257c817f0fb83/_config/size -d value=3 ``` -By setting the size key to the URL, you create a discovery URL with expected-cluster-size of 3. +By setting the size key to the URL, you create a discovery URL with an expected cluster size of 3. If you bootstrap an etcd cluster using discovery service with more than the expected number of etcd members, the extra etcd processes will [fall back][fall-back] to being [proxies][proxy] by default. @@ -124,14 +126,14 @@ This will cause each member to register itself with the custom etcd discovery se ### Public discovery service -If you do not have access to an existing cluster you can use the public discovery service hosted at discovery.etcd.io. You can create a private discovery URL using the "new" endpoint like so: +If you do not have access to an existing cluster, you can use the public discovery service hosted at `discovery.etcd.io`. You can create a private discovery URL using the "new" endpoint like so: ``` $ curl https://discovery.etcd.io/new?size=3 https://discovery.etcd.io/3e86b59982e49066c5d813af1c2e2579cbf573de ``` -This will create the cluster with an initial expected size of 3 members. If you do not specify a size a default of 3 will be used. +This will create the cluster with an initial expected size of 3 members. If you do not specify a size, a default of 3 will be used. If you bootstrap an etcd cluster using discovery service with more than the expected number of etcd members, the extra etcd processes will [fall back][fall-back] to being [proxies][proxy] by default. @@ -169,6 +171,7 @@ You can use the environment variable `ETCD_DISCOVERY_PROXY` to cause etcd to use #### Discovery Server Errors + ``` $ etcd -name infra0 -initial-advertise-peer-urls http://10.0.1.10:2380 \ -discovery https://discovery.etcd.io/3e86b59982e49066c5d813af1c2e2579cbf573de @@ -178,10 +181,13 @@ exit 1 #### User Errors +This error will occur if the discovery cluster already has the configured number of members, and `discovery-fallback` is explicitly disabled + ``` $ etcd -name infra0 -initial-advertise-peer-urls http://10.0.1.10:2380 \ - -discovery https://discovery.etcd.io/3e86b59982e49066c5d813af1c2e2579cbf573de -etcd: error: the cluster using discovery https://discovery.etcd.io/3e86b59982e49066c5d813af1c2e2579cbf573de has already started with all 5 members + -discovery https://discovery.etcd.io/3e86b59982e49066c5d813af1c2e2579cbf573de \ + -discovery-fallback exit +etcd: discovery: cluster is full exit 1 ``` @@ -193,7 +199,7 @@ ignored on this machine. ``` $ etcd -name infra0 -initial-advertise-peer-urls http://10.0.1.10:2380 \ -discovery https://discovery.etcd.io/3e86b59982e49066c5d813af1c2e2579cbf573de -etcd: warn: ignoring discovery URL: etcd has already been initialized and has a valid log in /var/lib/etcd +etcdserver: warn: ignoring discovery: etcd has already been initialized and has a valid log in /var/lib/etcd ``` # 0.4 to 0.5+ Migration Guide diff --git a/etcdmain/etcd.go b/etcdmain/etcd.go index e480c600f..8af5e432b 100644 --- a/etcdmain/etcd.go +++ b/etcdmain/etcd.go @@ -22,7 +22,10 @@ import ( "log" "net" "net/http" + "net/url" "os" + "reflect" + "sort" "strings" "github.com/coreos/etcd/discovery" @@ -363,7 +366,7 @@ func startProxy() error { return nil } -// setupCluster sets up the cluster definition for bootstrap or discovery. +// setupCluster sets up an initial cluster definition for bootstrap or discovery. func setupCluster() (*etcdserver.Cluster, error) { set := make(map[string]bool) fs.Visit(func(f *flag.Flag) { @@ -380,18 +383,40 @@ func setupCluster() (*etcdserver.Cluster, error) { var cls *etcdserver.Cluster switch { case set["discovery"]: + // If using discovery, generate a temporary cluster based on + // self's advertised peer URLs clusterStr := genClusterString(*name, apurls) cls, err = etcdserver.NewClusterFromString(*durl, clusterStr) case set["initial-cluster"]: fallthrough default: // We're statically configured, and cluster has appropriately been set. - // Try to configure by indexing the static cluster by name. cls, err = etcdserver.NewClusterFromString(*initialClusterToken, *initialCluster) + // Ensure our own advertised peer URLs match those specified in cluster + if err == nil && !clusterPeerURLsMatch(*name, cls, apurls) { + cls = nil + err = fmt.Errorf("%s has different advertised URLs in the cluster and advertised peer URLs list", *name) + } } return cls, err } +// clusterPeerURLsMatch checks whether the peer URLs of the member by the given +// name in the given cluster match the provided set of URLs +func clusterPeerURLsMatch(name string, cls *etcdserver.Cluster, urls []url.URL) bool { + m := cls.MemberByName(name) + if m == nil { + // should never happen + log.Panicf("could not find %q in cluster!", name) + } + purls := make([]string, len(urls)) + for i, u := range urls { + purls[i] = u.String() + } + sort.Strings(purls) + return reflect.DeepEqual(purls, m.PeerURLs) +} + func genClusterString(name string, urls types.URLs) string { addrs := make([]string, 0) for _, u := range urls { diff --git a/etcdmain/etcd_test.go b/etcdmain/etcd_test.go index 504e3cacf..147c6e05d 100644 --- a/etcdmain/etcd_test.go +++ b/etcdmain/etcd_test.go @@ -17,11 +17,73 @@ package etcdmain import ( + "net/url" "testing" + "github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/pkg/types" ) +func mustClsFromString(t *testing.T, s string) *etcdserver.Cluster { + cls, err := etcdserver.NewClusterFromString("", s) + if err != nil { + t.Fatalf("error creating cluster from %q: %v", s, err) + } + return cls +} + +func mustNewURLs(t *testing.T, urls []string) []url.URL { + u, err := types.NewURLs(urls) + if err != nil { + t.Fatalf("unexpected new urls error: %v", err) + } + return u +} + +func TestClusterPeerURLsMatch(t *testing.T) { + tests := []struct { + name string + cls *etcdserver.Cluster + urls []url.URL + + w bool + }{ + { + name: "default", + cls: mustClsFromString(t, "default=http://localhost:12345"), + urls: mustNewURLs(t, []string{"http://localhost:12345"}), + + w: true, + }, + { + name: "default", + cls: mustClsFromString(t, "default=http://localhost:7001,other=http://192.168.0.1:7002,default=http://localhost:12345"), + urls: mustNewURLs(t, []string{"http://localhost:7001", "http://localhost:12345"}), + + w: true, + }, + { + name: "infra1", + cls: mustClsFromString(t, "infra1=http://localhost:7001"), + urls: mustNewURLs(t, []string{"http://localhost:12345"}), + + w: false, + }, + { + name: "infra1", + cls: mustClsFromString(t, "infra1=http://localhost:7001,infra2=http://localhost:12345"), + urls: mustNewURLs(t, []string{"http://localhost:12345"}), + + w: false, + }, + } + for i, tt := range tests { + if g := clusterPeerURLsMatch(tt.name, tt.cls, tt.urls); g != tt.w { + t.Errorf("#%d: clusterPeerURLsMatch=%t, want %t", i, g, tt.w) + } + } +} + func TestGenClusterString(t *testing.T) { tests := []struct { token string @@ -38,10 +100,7 @@ func TestGenClusterString(t *testing.T) { }, } for i, tt := range tests { - urls, err := types.NewURLs(tt.urls) - if err != nil { - t.Fatalf("unexpected new urls error: %v", err) - } + urls := mustNewURLs(t, tt.urls) str := genClusterString(tt.token, urls) if str != tt.wstr { t.Errorf("#%d: cluster = %s, want %s", i, str, tt.wstr) diff --git a/etcdserver/cluster.go b/etcdserver/cluster.go index 362b91569..4f88ea647 100644 --- a/etcdserver/cluster.go +++ b/etcdserver/cluster.go @@ -42,11 +42,18 @@ const ( ) type ClusterInfo interface { + // ID returns the cluster ID ID() types.ID + // ClientURLs returns an aggregate set of all URLs on which this + // cluster is listening for client requests ClientURLs() []string // Members returns a slice of members sorted by their ID Members() []*Member + // Member retrieves a particular member based on ID, or nil if the + // member does not exist in the cluster Member(id types.ID) *Member + // IsIDRemoved checks whether the given ID has been removed from this + // cluster at some point in the past IsIDRemoved(id types.ID) bool } @@ -62,8 +69,9 @@ type Cluster struct { sync.Mutex } -// NewClusterFromString returns Cluster through given cluster token and parsing -// members from a sets of names to IPs discovery formatted like: +// NewClusterFromString returns a Cluster instantiated from the given cluster token +// and cluster string, by parsing members from a set of discovery-formatted +// names-to-IPs, like: // mach0=http://1.1.1.1,mach0=http://2.2.2.2,mach1=http://3.3.3.3,mach2=http://4.4.4.4 func NewClusterFromString(token string, cluster string) (*Cluster, error) { c := newCluster(token) diff --git a/etcdserver/config.go b/etcdserver/config.go index 35b371048..59d9486fe 100644 --- a/etcdserver/config.go +++ b/etcdserver/config.go @@ -45,7 +45,7 @@ func (c *ServerConfig) VerifyBootstrapConfig() error { m := c.Cluster.MemberByName(c.Name) // Make sure the cluster at least contains the local server. if m == nil { - return fmt.Errorf("couldn't find local name %s in the initial cluster configuration", c.Name) + return fmt.Errorf("couldn't find local name %q in the initial cluster configuration", c.Name) } if uint64(m.ID) == raft.None { return fmt.Errorf("cannot use %x as member id", raft.None) From 1197c1f96500271d974bb4021ccc31f4ab4e960b Mon Sep 17 00:00:00 2001 From: Jonathan Boulle Date: Mon, 10 Nov 2014 13:45:29 -0800 Subject: [PATCH 17/27] etcdserver: move peer URLs check to config --- etcdmain/etcd.go | 44 ++++++++++---------------------- etcdmain/etcd_test.go | 53 --------------------------------------- etcdserver/config.go | 10 ++++++++ etcdserver/config_test.go | 48 +++++++++++++++++++++++++++++++++-- 4 files changed, 69 insertions(+), 86 deletions(-) diff --git a/etcdmain/etcd.go b/etcdmain/etcd.go index 8af5e432b..c5c603872 100644 --- a/etcdmain/etcd.go +++ b/etcdmain/etcd.go @@ -24,8 +24,6 @@ import ( "net/http" "net/url" "os" - "reflect" - "sort" "strings" "github.com/coreos/etcd/discovery" @@ -191,7 +189,11 @@ func Main() { // startEtcd launches the etcd server and HTTP handlers for client/server communication. func startEtcd() error { - cls, err := setupCluster() + apurls, err := flags.URLsFromFlags(fs, "initial-advertise-peer-urls", "addr", peerTLSInfo) + if err != nil { + return err + } + cls, err := setupCluster(apurls) if err != nil { return fmt.Errorf("error setting up initial cluster: %v", err) } @@ -268,6 +270,7 @@ func startEtcd() error { cfg := &etcdserver.ServerConfig{ Name: *name, ClientURLs: acurls, + PeerURLs: apurls, DataDir: *dir, SnapCount: *snapCount, Cluster: cls, @@ -306,7 +309,11 @@ func startEtcd() error { // startProxy launches an HTTP proxy for client communication which proxies to other etcd nodes. func startProxy() error { - cls, err := setupCluster() + apurls, err := flags.URLsFromFlags(fs, "initial-advertise-peer-urls", "addr", peerTLSInfo) + if err != nil { + return err + } + cls, err := setupCluster(apurls) if err != nil { return fmt.Errorf("error setting up initial cluster: %v", err) } @@ -367,7 +374,7 @@ func startProxy() error { } // setupCluster sets up an initial cluster definition for bootstrap or discovery. -func setupCluster() (*etcdserver.Cluster, error) { +func setupCluster(apurls []url.URL) (*etcdserver.Cluster, error) { set := make(map[string]bool) fs.Visit(func(f *flag.Flag) { set[f.Name] = true @@ -375,12 +382,8 @@ func setupCluster() (*etcdserver.Cluster, error) { if set["discovery"] && set["initial-cluster"] { return nil, fmt.Errorf("both discovery and bootstrap-config are set") } - apurls, err := flags.URLsFromFlags(fs, "initial-advertise-peer-urls", "addr", peerTLSInfo) - if err != nil { - return nil, err - } - var cls *etcdserver.Cluster + var err error switch { case set["discovery"]: // If using discovery, generate a temporary cluster based on @@ -392,31 +395,10 @@ func setupCluster() (*etcdserver.Cluster, error) { default: // We're statically configured, and cluster has appropriately been set. cls, err = etcdserver.NewClusterFromString(*initialClusterToken, *initialCluster) - // Ensure our own advertised peer URLs match those specified in cluster - if err == nil && !clusterPeerURLsMatch(*name, cls, apurls) { - cls = nil - err = fmt.Errorf("%s has different advertised URLs in the cluster and advertised peer URLs list", *name) - } } return cls, err } -// clusterPeerURLsMatch checks whether the peer URLs of the member by the given -// name in the given cluster match the provided set of URLs -func clusterPeerURLsMatch(name string, cls *etcdserver.Cluster, urls []url.URL) bool { - m := cls.MemberByName(name) - if m == nil { - // should never happen - log.Panicf("could not find %q in cluster!", name) - } - purls := make([]string, len(urls)) - for i, u := range urls { - purls[i] = u.String() - } - sort.Strings(purls) - return reflect.DeepEqual(purls, m.PeerURLs) -} - func genClusterString(name string, urls types.URLs) string { addrs := make([]string, 0) for _, u := range urls { diff --git a/etcdmain/etcd_test.go b/etcdmain/etcd_test.go index 147c6e05d..dff57f618 100644 --- a/etcdmain/etcd_test.go +++ b/etcdmain/etcd_test.go @@ -20,18 +20,9 @@ import ( "net/url" "testing" - "github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/pkg/types" ) -func mustClsFromString(t *testing.T, s string) *etcdserver.Cluster { - cls, err := etcdserver.NewClusterFromString("", s) - if err != nil { - t.Fatalf("error creating cluster from %q: %v", s, err) - } - return cls -} - func mustNewURLs(t *testing.T, urls []string) []url.URL { u, err := types.NewURLs(urls) if err != nil { @@ -40,50 +31,6 @@ func mustNewURLs(t *testing.T, urls []string) []url.URL { return u } -func TestClusterPeerURLsMatch(t *testing.T) { - tests := []struct { - name string - cls *etcdserver.Cluster - urls []url.URL - - w bool - }{ - { - name: "default", - cls: mustClsFromString(t, "default=http://localhost:12345"), - urls: mustNewURLs(t, []string{"http://localhost:12345"}), - - w: true, - }, - { - name: "default", - cls: mustClsFromString(t, "default=http://localhost:7001,other=http://192.168.0.1:7002,default=http://localhost:12345"), - urls: mustNewURLs(t, []string{"http://localhost:7001", "http://localhost:12345"}), - - w: true, - }, - { - name: "infra1", - cls: mustClsFromString(t, "infra1=http://localhost:7001"), - urls: mustNewURLs(t, []string{"http://localhost:12345"}), - - w: false, - }, - { - name: "infra1", - cls: mustClsFromString(t, "infra1=http://localhost:7001,infra2=http://localhost:12345"), - urls: mustNewURLs(t, []string{"http://localhost:12345"}), - - w: false, - }, - } - for i, tt := range tests { - if g := clusterPeerURLsMatch(tt.name, tt.cls, tt.urls); g != tt.w { - t.Errorf("#%d: clusterPeerURLsMatch=%t, want %t", i, g, tt.w) - } - } -} - func TestGenClusterString(t *testing.T) { tests := []struct { token string diff --git a/etcdserver/config.go b/etcdserver/config.go index 59d9486fe..6a434b0d3 100644 --- a/etcdserver/config.go +++ b/etcdserver/config.go @@ -20,6 +20,8 @@ import ( "fmt" "net/http" "path" + "reflect" + "sort" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft" @@ -31,6 +33,7 @@ type ServerConfig struct { DiscoveryURL string DiscoveryProxy string ClientURLs types.URLs + PeerURLs types.URLs DataDir string SnapCount uint64 Cluster *Cluster @@ -65,6 +68,13 @@ func (c *ServerConfig) VerifyBootstrapConfig() error { urlMap[url] = true } } + + // Advertised peer URLs must match those in the cluster peer list + apurls := c.PeerURLs.StringSlice() + sort.Strings(apurls) + if !reflect.DeepEqual(apurls, m.PeerURLs) { + return fmt.Errorf("%s has different advertised URLs in the cluster and advertised peer URLs list", c.Name) + } return nil } diff --git a/etcdserver/config_test.go b/etcdserver/config_test.go index d5bda587d..bbfb59882 100644 --- a/etcdserver/config_test.go +++ b/etcdserver/config_test.go @@ -16,12 +16,26 @@ package etcdserver -import "testing" +import ( + "net/url" + "testing" + + "github.com/coreos/etcd/pkg/types" +) + +func mustNewURLs(t *testing.T, urls []string) []url.URL { + u, err := types.NewURLs(urls) + if err != nil { + t.Fatalf("error creating new URLs from %q: %v", urls, err) + } + return u +} func TestBootstrapConfigVerify(t *testing.T) { tests := []struct { clusterSetting string newclst bool + apurls []string disc string shouldError bool }{ @@ -29,35 +43,63 @@ func TestBootstrapConfigVerify(t *testing.T) { // Node must exist in cluster "", true, + nil, "", + true, }, { // Cannot have duplicate URLs in cluster config "node1=http://localhost:7001,node2=http://localhost:7001,node2=http://localhost:7002", true, + nil, "", + true, }, { // Node defined, ClusterState OK "node1=http://localhost:7001,node2=http://localhost:7002", true, + []string{"http://localhost:7001"}, "", + false, }, { // Node defined, discovery OK "node1=http://localhost:7001", false, + []string{"http://localhost:7001"}, "http://discovery", + false, }, { // Cannot have ClusterState!=new && !discovery "node1=http://localhost:7001", false, + nil, "", + + true, + }, + { + // Advertised peer URLs must match those in cluster-state + "node1=http://localhost:7001", + true, + []string{"http://localhost:12345"}, + "", + + true, + }, + { + // Advertised peer URLs must match those in cluster-state + "node1=http://localhost:7001,node1=http://localhost:12345", + true, + []string{"http://localhost:12345"}, + "", + true, }, } @@ -67,13 +109,15 @@ func TestBootstrapConfigVerify(t *testing.T) { if err != nil { t.Fatalf("#%d: Got unexpected error: %v", i, err) } - cfg := ServerConfig{ Name: "node1", DiscoveryURL: tt.disc, Cluster: cluster, NewCluster: tt.newclst, } + if tt.apurls != nil { + cfg.PeerURLs = mustNewURLs(t, tt.apurls) + } err = cfg.VerifyBootstrapConfig() if (err == nil) && tt.shouldError { t.Errorf("%#v", *cluster) From d1ae276434fc6b6229dd3dc9ca22030798e0cff9 Mon Sep 17 00:00:00 2001 From: Jonathan Boulle Date: Wed, 12 Nov 2014 13:11:36 -0800 Subject: [PATCH 18/27] integration: fix test to propagate NewServer errors --- integration/cluster_test.go | 28 +++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/integration/cluster_test.go b/integration/cluster_test.go index d648287a5..353898031 100644 --- a/integration/cluster_test.go +++ b/integration/cluster_test.go @@ -25,7 +25,6 @@ import ( "net/http/httptest" "os" "strings" - "sync" "testing" "time" @@ -154,17 +153,19 @@ func NewClusterByDiscovery(t *testing.T, size int, url string) *cluster { } func (c *cluster) Launch(t *testing.T) { - var wg sync.WaitGroup + errc := make(chan error) for _, m := range c.Members { - wg.Add(1) // Members are launched in separate goroutines because if they boot // using discovery url, they have to wait for others to register to continue. go func(m *member) { - m.Launch(t) - wg.Done() + errc <- m.Launch(t) }(m) } - wg.Wait() + for _ = range c.Members { + if err := <-errc; err != nil { + t.Fatalf("error setting up member: %v", err) + } + } // wait cluster to be stable to receive future client requests c.waitClientURLsPublished(t) } @@ -243,15 +244,23 @@ type member struct { func mustNewMember(t *testing.T, name string) *member { var err error m := &member{} + pln := newLocalListener(t) m.PeerListeners = []net.Listener{pln} + m.PeerURLs, err = types.NewURLs([]string{"http://" + pln.Addr().String()}) + if err != nil { + t.Fatal(err) + } + cln := newLocalListener(t) m.ClientListeners = []net.Listener{cln} - m.Name = name m.ClientURLs, err = types.NewURLs([]string{"http://" + cln.Addr().String()}) if err != nil { t.Fatal(err) } + + m.Name = name + m.DataDir, err = ioutil.TempDir(os.TempDir(), "etcd") if err != nil { t.Fatal(err) @@ -268,10 +277,10 @@ func mustNewMember(t *testing.T, name string) *member { // Launch starts a member based on ServerConfig, PeerListeners // and ClientListeners. -func (m *member) Launch(t *testing.T) { +func (m *member) Launch(t *testing.T) error { var err error if m.s, err = etcdserver.NewServer(&m.ServerConfig); err != nil { - t.Fatalf("failed to initialize the etcd server: %v", err) + return fmt.Errorf("failed to initialize the etcd server: %v", err) } m.s.Ticker = time.Tick(tickDuration) m.s.SyncTicker = time.Tick(10 * tickDuration) @@ -293,6 +302,7 @@ func (m *member) Launch(t *testing.T) { hs.Start() m.hss = append(m.hss, hs) } + return nil } // Stop stops the member, but the data dir of the member is preserved. From d834324e973e22900cade94ebf3b944f9a2c58b1 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 12 Nov 2014 12:32:20 -0800 Subject: [PATCH 19/27] raft: stop the node synchronously --- raft/node.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/raft/node.go b/raft/node.go index 003db937f..13a50db31 100644 --- a/raft/node.go +++ b/raft/node.go @@ -192,6 +192,7 @@ type node struct { advancec chan struct{} tickc chan struct{} done chan struct{} + stop chan struct{} } func newNode() node { @@ -204,11 +205,13 @@ func newNode() node { advancec: make(chan struct{}), tickc: make(chan struct{}), done: make(chan struct{}), + stop: make(chan struct{}), } } func (n *node) Stop() { - close(n.done) + n.stop <- struct{}{} + <-n.stop } func (n *node) run(r *raft) { @@ -302,7 +305,9 @@ func (n *node) run(r *raft) { } r.raftLog.stableTo(prevLastUnstablei) advancec = nil - case <-n.done: + case <-n.stop: + n.stop <- struct{}{} + close(n.done) return } } From 5cef3d888a2d5ed5dab852d577c396bda8fbde45 Mon Sep 17 00:00:00 2001 From: Jonathan Boulle Date: Wed, 12 Nov 2014 14:11:56 -0800 Subject: [PATCH 20/27] integration: remove unnecessary t.Testing argument --- integration/cluster_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/integration/cluster_test.go b/integration/cluster_test.go index 353898031..b8863c917 100644 --- a/integration/cluster_test.go +++ b/integration/cluster_test.go @@ -158,7 +158,7 @@ func (c *cluster) Launch(t *testing.T) { // Members are launched in separate goroutines because if they boot // using discovery url, they have to wait for others to register to continue. go func(m *member) { - errc <- m.Launch(t) + errc <- m.Launch() }(m) } for _ = range c.Members { @@ -277,7 +277,7 @@ func mustNewMember(t *testing.T, name string) *member { // Launch starts a member based on ServerConfig, PeerListeners // and ClientListeners. -func (m *member) Launch(t *testing.T) error { +func (m *member) Launch() error { var err error if m.s, err = etcdserver.NewServer(&m.ServerConfig); err != nil { return fmt.Errorf("failed to initialize the etcd server: %v", err) From bc9de47a9a6bd8a5069694f20161a0199cc21bb9 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Thu, 6 Nov 2014 15:38:59 -0800 Subject: [PATCH 21/27] integration: add increase cluster size test --- integration/cluster_test.go | 128 ++++++++++++++++++++++++++++-------- 1 file changed, 100 insertions(+), 28 deletions(-) diff --git a/integration/cluster_test.go b/integration/cluster_test.go index b8863c917..e3f34765e 100644 --- a/integration/cluster_test.go +++ b/integration/cluster_test.go @@ -24,6 +24,8 @@ import ( "net/http" "net/http/httptest" "os" + "reflect" + "sort" "strings" "testing" "time" @@ -82,6 +84,21 @@ func testClusterUsingDiscovery(t *testing.T, size int) { clusterMustProgress(t, c) } +func TestDoubleClusterSizeOf1(t *testing.T) { testDoubleClusterSize(t, 1) } +func TestDoubleClusterSizeOf3(t *testing.T) { testDoubleClusterSize(t, 3) } + +func testDoubleClusterSize(t *testing.T, size int) { + defer afterTest(t) + c := NewCluster(t, size) + c.Launch(t) + defer c.Terminate(t) + + for i := 0; i < size; i++ { + c.AddMember(t) + } + clusterMustProgress(t, c) +} + // clusterMustProgress ensures that cluster can make progress. It creates // a key first, and check the new key could be got from all client urls of // the cluster. @@ -167,7 +184,7 @@ func (c *cluster) Launch(t *testing.T) { } } // wait cluster to be stable to receive future client requests - c.waitClientURLsPublished(t) + c.waitMembersMatch(t, c.HTTPMembers()) } func (c *cluster) URL(i int) string { @@ -184,47 +201,94 @@ func (c *cluster) URLs() []string { return urls } +func (c *cluster) HTTPMembers() []httptypes.Member { + ms := make([]httptypes.Member, len(c.Members)) + for i, m := range c.Members { + ms[i].Name = m.Name + for _, ln := range m.PeerListeners { + ms[i].PeerURLs = append(ms[i].PeerURLs, "http://"+ln.Addr().String()) + } + for _, ln := range m.ClientListeners { + ms[i].ClientURLs = append(ms[i].ClientURLs, "http://"+ln.Addr().String()) + } + } + return ms +} + +func (c *cluster) AddMember(t *testing.T) { + clusterStr := c.Members[0].Cluster.String() + idx := len(c.Members) + m := mustNewMember(t, c.name(idx)) + + // send add request to the cluster + cc := mustNewHTTPClient(t, []string{c.URL(0)}) + ma := client.NewMembersAPI(cc) + ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) + peerURL := "http://" + m.PeerListeners[0].Addr().String() + if _, err := ma.Add(ctx, peerURL); err != nil { + t.Fatalf("add member on %s error: %v", c.URL(0), err) + } + cancel() + + // wait for the add node entry applied in the cluster + members := append(c.HTTPMembers(), httptypes.Member{PeerURLs: []string{peerURL}, ClientURLs: []string{}}) + c.waitMembersMatch(t, members) + + for _, ln := range m.PeerListeners { + clusterStr += fmt.Sprintf(",%s=http://%s", m.Name, ln.Addr().String()) + } + var err error + m.Cluster, err = etcdserver.NewClusterFromString(clusterName, clusterStr) + if err != nil { + t.Fatal(err) + } + m.NewCluster = false + if err := m.Launch(); err != nil { + t.Fatal(err) + } + c.Members = append(c.Members, m) + // wait cluster to be stable to receive future client requests + c.waitMembersMatch(t, c.HTTPMembers()) +} + func (c *cluster) Terminate(t *testing.T) { for _, m := range c.Members { m.Terminate(t) } } -func (c *cluster) waitClientURLsPublished(t *testing.T) { - timer := time.AfterFunc(10*time.Second, func() { - t.Fatal("wait too long for client urls publish") - }) - cc := mustNewHTTPClient(t, []string{c.URL(0)}) - ma := client.NewMembersAPI(cc) - for { - ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) - membs, err := ma.List(ctx) - cancel() - if err == nil && c.checkClientURLsPublished(membs) { - break +func (c *cluster) waitMembersMatch(t *testing.T, membs []httptypes.Member) { + for _, u := range c.URLs() { + cc := mustNewHTTPClient(t, []string{u}) + ma := client.NewMembersAPI(cc) + for { + ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) + ms, err := ma.List(ctx) + cancel() + if err == nil && isMembersEqual(ms, membs) { + break + } + time.Sleep(tickDuration) } - time.Sleep(tickDuration) } - timer.Stop() return } -func (c *cluster) checkClientURLsPublished(membs []httptypes.Member) bool { - if len(membs) != len(c.Members) { - return false - } - for _, m := range membs { - if len(m.ClientURLs) == 0 { - return false - } - } - return true -} - func (c *cluster) name(i int) string { return fmt.Sprint("node", i) } +// isMembersEqual checks whether two members equal except ID field. +// The given wmembs should always set ID field to empty string. +func isMembersEqual(membs []httptypes.Member, wmembs []httptypes.Member) bool { + sort.Sort(SortableMemberSliceByPeerURLs(membs)) + sort.Sort(SortableMemberSliceByPeerURLs(wmembs)) + for i := range membs { + membs[i].ID = "" + } + return reflect.DeepEqual(membs, wmembs) +} + func newLocalListener(t *testing.T) net.Listener { l, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { @@ -283,7 +347,7 @@ func (m *member) Launch() error { return fmt.Errorf("failed to initialize the etcd server: %v", err) } m.s.Ticker = time.Tick(tickDuration) - m.s.SyncTicker = time.Tick(10 * tickDuration) + m.s.SyncTicker = time.Tick(500 * time.Millisecond) m.s.Start() for _, ln := range m.PeerListeners { @@ -342,3 +406,11 @@ func newTransport() *http.Transport { tr.Dial = (&net.Dialer{Timeout: 100 * time.Millisecond}).Dial return tr } + +type SortableMemberSliceByPeerURLs []httptypes.Member + +func (p SortableMemberSliceByPeerURLs) Len() int { return len(p) } +func (p SortableMemberSliceByPeerURLs) Less(i, j int) bool { + return p[i].PeerURLs[0] < p[j].PeerURLs[0] +} +func (p SortableMemberSliceByPeerURLs) Swap(i, j int) { p[i], p[j] = p[j], p[i] } From 68ab7e69e1186d736ece2670bec56d319e6aca43 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 12 Nov 2014 14:39:07 -0800 Subject: [PATCH 22/27] raft: add a test for node proposal --- raft/node_test.go | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/raft/node_test.go b/raft/node_test.go index a2f43d265..313d852cc 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -107,6 +107,41 @@ func TestNodeStepUnblock(t *testing.T) { } } +// TestNodePropose ensures that node.Propose sends the given proposal to the underlying raft. +func TestNodePropose(t *testing.T) { + msgs := []raftpb.Message{} + appendStep := func(r *raft, m raftpb.Message) { + msgs = append(msgs, m) + } + + n := newNode() + r := newRaft(1, []uint64{1}, 10, 1) + go n.run(r) + n.Campaign(context.TODO()) + for { + rd := <-n.Ready() + // change the step function to appendStep until this raft becomes leader + if rd.SoftState.Lead == r.id { + r.step = appendStep + n.Advance() + break + } + n.Advance() + } + n.Propose(context.TODO(), []byte("somedata")) + n.Stop() + + if len(msgs) != 1 { + t.Fatalf("len(msgs) = %d, want %d", len(msgs), 1) + } + if msgs[0].Type != raftpb.MsgProp { + t.Errorf("msg type = %d, want %d", msgs[0].Type, raftpb.MsgProp) + } + if !reflect.DeepEqual(msgs[0].Entries[0].Data, []byte("somedata")) { + t.Errorf("data = %v, want %v", msgs[0].Entries[0].Data, []byte("somedata")) + } +} + // TestBlockProposal ensures that node will block proposal when it does not // know who is the current leader; node will accept proposal when it knows // who is the current leader. From 2cedf127d481bbbfae138445ebbcbcb025d208f4 Mon Sep 17 00:00:00 2001 From: Jonathan Boulle Date: Wed, 12 Nov 2014 15:02:48 -0800 Subject: [PATCH 23/27] raft: block Stop() on n.done, support idempotency --- raft/node.go | 12 +++++++++--- raft/node_test.go | 36 +++++++++++++++++++++++++++++++++++- 2 files changed, 44 insertions(+), 4 deletions(-) diff --git a/raft/node.go b/raft/node.go index e554e3e19..35750e733 100644 --- a/raft/node.go +++ b/raft/node.go @@ -210,8 +210,15 @@ func newNode() node { } func (n *node) Stop() { - n.stop <- struct{}{} - <-n.stop + select { + case n.stop <- struct{}{}: + // Not already stopped, so trigger it + case <-n.done: + // Node has already been stopped - no need to do anything + return + } + // Block until the stop has been acknowledged by run() + <-n.done } func (n *node) run(r *raft) { @@ -306,7 +313,6 @@ func (n *node) run(r *raft) { r.raftLog.stableTo(prevLastUnstablei) advancec = nil case <-n.stop: - n.stop <- struct{}{} close(n.done) return } diff --git a/raft/node_test.go b/raft/node_test.go index a2f43d265..7e6617d68 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -141,7 +141,7 @@ func TestBlockProposal(t *testing.T) { } // TestNodeTick ensures that node.Tick() will increase the -// elapsed of the underly raft state machine. +// elapsed of the underlying raft state machine. func TestNodeTick(t *testing.T) { n := newNode() r := newRaft(1, []uint64{1}, 10, 1) @@ -154,6 +154,40 @@ func TestNodeTick(t *testing.T) { } } +// TestNodeStop ensures that node.Stop() blocks until the node has stopped +// processing, and that it is idempotent +func TestNodeStop(t *testing.T) { + n := newNode() + r := newRaft(1, []uint64{1}, 10, 1) + donec := make(chan struct{}) + + go func() { + n.run(r) + close(donec) + }() + + elapsed := r.elapsed + n.Tick() + n.Stop() + + select { + case <-donec: + case <-time.After(time.Second): + t.Fatalf("timed out waiting for node to stop!") + } + + if r.elapsed != elapsed+1 { + t.Errorf("elapsed = %d, want %d", r.elapsed, elapsed+1) + } + // Further ticks should have no effect, the node is stopped. + n.Tick() + if r.elapsed != elapsed+1 { + t.Errorf("elapsed = %d, want %d", r.elapsed, elapsed+1) + } + // Subsequent Stops should have no effect. + n.Stop() +} + func TestReadyContainUpdates(t *testing.T) { tests := []struct { rd Ready From 2a407dadc0e20d020d3619d8032681e6063bb018 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 12 Nov 2014 16:16:26 -0800 Subject: [PATCH 24/27] raft: add a test for proposeConfChange --- raft/node_test.go | 41 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/raft/node_test.go b/raft/node_test.go index 313d852cc..240bccf34 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -142,6 +142,47 @@ func TestNodePropose(t *testing.T) { } } +// TestNodeProposeConfig ensures that node.ProposeConfChange sends the given configuration proposal +// to the underlying raft. +func TestNodeProposeConfig(t *testing.T) { + msgs := []raftpb.Message{} + appendStep := func(r *raft, m raftpb.Message) { + msgs = append(msgs, m) + } + + n := newNode() + r := newRaft(1, []uint64{1}, 10, 1) + go n.run(r) + n.Campaign(context.TODO()) + for { + rd := <-n.Ready() + // change the step function to appendStep until this raft becomes leader + if rd.SoftState.Lead == r.id { + r.step = appendStep + n.Advance() + break + } + n.Advance() + } + cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1} + ccdata, err := cc.Marshal() + if err != nil { + t.Fatal(err) + } + n.ProposeConfChange(context.TODO(), cc) + n.Stop() + + if len(msgs) != 1 { + t.Fatalf("len(msgs) = %d, want %d", len(msgs), 1) + } + if msgs[0].Type != raftpb.MsgProp { + t.Errorf("msg type = %d, want %d", msgs[0].Type, raftpb.MsgProp) + } + if !reflect.DeepEqual(msgs[0].Entries[0].Data, ccdata) { + t.Errorf("data = %v, want %v", msgs[0].Entries[0].Data, ccdata) + } +} + // TestBlockProposal ensures that node will block proposal when it does not // know who is the current leader; node will accept proposal when it knows // who is the current leader. From 0c2b45ddc6497b351cb85395b258c91ef92f04d7 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Wed, 5 Nov 2014 16:40:15 -0800 Subject: [PATCH 25/27] etcdserver: not record attributes when add member There is no need to set attributes value when adding member because new member will publish the information whenever it starts. --- etcdserver/cluster.go | 41 +++++++++++----------- etcdserver/cluster_test.go | 36 +++++++------------ etcdserver/server_test.go | 12 ++++--- integration/v2_http_kv_test.go | 64 +++++++++++++++++----------------- 4 files changed, 73 insertions(+), 80 deletions(-) diff --git a/etcdserver/cluster.go b/etcdserver/cluster.go index 4f88ea647..c31aca01d 100644 --- a/etcdserver/cluster.go +++ b/etcdserver/cluster.go @@ -327,7 +327,8 @@ func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error { return nil } -// AddMember puts a new Member into the store. +// AddMember adds a new Member into the cluster, and saves the given member's +// raftAttributes into the store. The given member should have empty attributes. // A Member with a matching id must not exist. func (c *Cluster) AddMember(m *Member) { c.Lock() @@ -340,14 +341,6 @@ func (c *Cluster) AddMember(m *Member) { if _, err := c.store.Create(p, false, string(b), false, store.Permanent); err != nil { log.Panicf("create raftAttributes should never fail: %v", err) } - b, err = json.Marshal(m.Attributes) - if err != nil { - log.Panicf("marshal attributes should never fail: %v", err) - } - p = path.Join(memberStoreKey(m.ID), attributesSuffix) - if _, err := c.store.Create(p, false, string(b), false, store.Permanent); err != nil { - log.Panicf("create attributes should never fail: %v", err) - } c.members[m.ID] = m } @@ -390,20 +383,26 @@ func (c *Cluster) UpdateMember(nm *Member) { // the child nodes of the given node should be sorted by key. func nodeToMember(n *store.NodeExtern) (*Member, error) { m := &Member{ID: mustParseMemberIDFromKey(n.Key)} - if len(n.Nodes) != 2 { - return m, fmt.Errorf("len(nodes) = %d, want 2", len(n.Nodes)) + attrs := make(map[string][]byte) + raftAttrKey := path.Join(n.Key, raftAttributesSuffix) + attrKey := path.Join(n.Key, attributesSuffix) + for _, nn := range n.Nodes { + if nn.Key != raftAttrKey && nn.Key != attrKey { + return nil, fmt.Errorf("unknown key %q", nn.Key) + } + attrs[nn.Key] = []byte(*nn.Value) } - if w := path.Join(n.Key, attributesSuffix); n.Nodes[0].Key != w { - return m, fmt.Errorf("key = %v, want %v", n.Nodes[0].Key, w) + if data := attrs[raftAttrKey]; data != nil { + if err := json.Unmarshal(data, &m.RaftAttributes); err != nil { + return nil, fmt.Errorf("unmarshal raftAttributes error: %v", err) + } + } else { + return nil, fmt.Errorf("raftAttributes key doesn't exist") } - if err := json.Unmarshal([]byte(*n.Nodes[0].Value), &m.Attributes); err != nil { - return m, fmt.Errorf("unmarshal attributes error: %v", err) - } - if w := path.Join(n.Key, raftAttributesSuffix); n.Nodes[1].Key != w { - return m, fmt.Errorf("key = %v, want %v", n.Nodes[1].Key, w) - } - if err := json.Unmarshal([]byte(*n.Nodes[1].Value), &m.RaftAttributes); err != nil { - return m, fmt.Errorf("unmarshal raftAttributes error: %v", err) + if data := attrs[attrKey]; data != nil { + if err := json.Unmarshal(data, &m.Attributes); err != nil { + return m, fmt.Errorf("unmarshal attributes error: %v", err) + } } return m, nil } diff --git a/etcdserver/cluster_test.go b/etcdserver/cluster_test.go index 08c34a9b5..77323fe33 100644 --- a/etcdserver/cluster_test.go +++ b/etcdserver/cluster_test.go @@ -82,20 +82,21 @@ func TestClusterFromStore(t *testing.T) { mems []*Member }{ { - []*Member{newTestMember(1, nil, "node1", nil)}, + []*Member{newTestMember(1, nil, "", nil)}, }, { nil, }, { []*Member{ - newTestMember(1, nil, "node1", nil), - newTestMember(2, nil, "node2", nil), + newTestMember(1, nil, "", nil), + newTestMember(2, nil, "", nil), }, }, } for i, tt := range tests { hc := newTestCluster(nil) + hc.SetStore(store.New()) for _, m := range tt.mems { hc.AddMember(m) } @@ -500,22 +501,22 @@ func TestNodeToMemberBad(t *testing.T) { {Key: "/1234/strange"}, }}, {Key: "/1234", Nodes: []*store.NodeExtern{ - {Key: "/1234/dynamic", Value: stringp("garbage")}, + {Key: "/1234/raftAttributes", Value: stringp("garbage")}, }}, {Key: "/1234", Nodes: []*store.NodeExtern{ - {Key: "/1234/dynamic", Value: stringp(`{"peerURLs":null}`)}, + {Key: "/1234/attributes", Value: stringp(`{"name":"node1","clientURLs":null}`)}, }}, {Key: "/1234", Nodes: []*store.NodeExtern{ - {Key: "/1234/dynamic", Value: stringp(`{"peerURLs":null}`)}, + {Key: "/1234/raftAttributes", Value: stringp(`{"peerURLs":null}`)}, {Key: "/1234/strange"}, }}, {Key: "/1234", Nodes: []*store.NodeExtern{ - {Key: "/1234/dynamic", Value: stringp(`{"peerURLs":null}`)}, - {Key: "/1234/static", Value: stringp("garbage")}, + {Key: "/1234/raftAttributes", Value: stringp(`{"peerURLs":null}`)}, + {Key: "/1234/attributes", Value: stringp("garbage")}, }}, {Key: "/1234", Nodes: []*store.NodeExtern{ - {Key: "/1234/dynamic", Value: stringp(`{"peerURLs":null}`)}, - {Key: "/1234/static", Value: stringp(`{"name":"node1","clientURLs":null}`)}, + {Key: "/1234/raftAttributes", Value: stringp(`{"peerURLs":null}`)}, + {Key: "/1234/attributes", Value: stringp(`{"name":"node1","clientURLs":null}`)}, {Key: "/1234/strange"}, }}, } @@ -543,16 +544,6 @@ func TestClusterAddMember(t *testing.T) { store.Permanent, }, }, - { - name: "Create", - params: []interface{}{ - path.Join(storeMembersPrefix, "1", "attributes"), - false, - `{"name":"node1"}`, - false, - store.Permanent, - }, - }, } if g := st.Action(); !reflect.DeepEqual(g, wactions) { t.Errorf("actions = %v, want %v", g, wactions) @@ -656,9 +647,8 @@ func TestNodeToMember(t *testing.T) { func newTestCluster(membs []*Member) *Cluster { c := &Cluster{members: make(map[types.ID]*Member), removed: make(map[types.ID]bool)} - c.store = store.New() - for i := range membs { - c.AddMember(membs[i]) + for _, m := range membs { + c.members[m.ID] = m } return c } diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 566e8cc21..14bbed022 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -554,8 +554,8 @@ func testServer(t *testing.T, ns uint64) { g, w := resp.Event.Node, &store.NodeExtern{ Key: "/foo", - ModifiedIndex: uint64(i) + 2*ns, - CreatedIndex: uint64(i) + 2*ns, + ModifiedIndex: uint64(i) + ns, + CreatedIndex: uint64(i) + ns, Value: stringp("bar"), } @@ -993,7 +993,9 @@ func TestRemoveMember(t *testing.T) { Nodes: []uint64{1234, 2345, 3456}, }, } - cl := newTestCluster([]*Member{{ID: 1234}}) + cl := newTestCluster(nil) + cl.SetStore(store.New()) + cl.AddMember(&Member{ID: 1234}) s := &EtcdServer{ node: n, store: &storeRecorder{}, @@ -1027,7 +1029,9 @@ func TestUpdateMember(t *testing.T) { Nodes: []uint64{1234, 2345, 3456}, }, } - cl := newTestCluster([]*Member{{ID: 1234}}) + cl := newTestCluster(nil) + cl.SetStore(store.New()) + cl.AddMember(&Member{ID: 1234}) s := &EtcdServer{ node: n, store: &storeRecorder{}, diff --git a/integration/v2_http_kv_test.go b/integration/v2_http_kv_test.go index 6047187b7..55addb7cf 100644 --- a/integration/v2_http_kv_test.go +++ b/integration/v2_http_kv_test.go @@ -55,19 +55,19 @@ func TestV2Set(t *testing.T) { "/v2/keys/foo/bar", v, http.StatusCreated, - `{"action":"set","node":{"key":"/foo/bar","value":"bar","modifiedIndex":4,"createdIndex":4}}`, + `{"action":"set","node":{"key":"/foo/bar","value":"bar","modifiedIndex":3,"createdIndex":3}}`, }, { "/v2/keys/foodir?dir=true", url.Values{}, http.StatusCreated, - `{"action":"set","node":{"key":"/foodir","dir":true,"modifiedIndex":5,"createdIndex":5}}`, + `{"action":"set","node":{"key":"/foodir","dir":true,"modifiedIndex":4,"createdIndex":4}}`, }, { "/v2/keys/fooempty", url.Values(map[string][]string{"value": {""}}), http.StatusCreated, - `{"action":"set","node":{"key":"/fooempty","value":"","modifiedIndex":6,"createdIndex":6}}`, + `{"action":"set","node":{"key":"/fooempty","value":"","modifiedIndex":5,"createdIndex":5}}`, }, } @@ -216,12 +216,12 @@ func TestV2CAS(t *testing.T) { }, { "/v2/keys/cas/foo", - url.Values(map[string][]string{"value": {"YYY"}, "prevIndex": {"4"}}), + url.Values(map[string][]string{"value": {"YYY"}, "prevIndex": {"3"}}), http.StatusOK, map[string]interface{}{ "node": map[string]interface{}{ "value": "YYY", - "modifiedIndex": float64(5), + "modifiedIndex": float64(4), }, "action": "compareAndSwap", }, @@ -233,8 +233,8 @@ func TestV2CAS(t *testing.T) { map[string]interface{}{ "errorCode": float64(101), "message": "Compare failed", - "cause": "[10 != 5]", - "index": float64(5), + "cause": "[10 != 4]", + "index": float64(4), }, }, { @@ -283,7 +283,7 @@ func TestV2CAS(t *testing.T) { map[string]interface{}{ "errorCode": float64(101), "message": "Compare failed", - "cause": "[bad_value != ZZZ] [100 != 6]", + "cause": "[bad_value != ZZZ] [100 != 5]", }, }, { @@ -293,12 +293,12 @@ func TestV2CAS(t *testing.T) { map[string]interface{}{ "errorCode": float64(101), "message": "Compare failed", - "cause": "[100 != 6]", + "cause": "[100 != 5]", }, }, { "/v2/keys/cas/foo", - url.Values(map[string][]string{"value": {"XXX"}, "prevValue": {"bad_value"}, "prevIndex": {"6"}}), + url.Values(map[string][]string{"value": {"XXX"}, "prevValue": {"bad_value"}, "prevIndex": {"5"}}), http.StatusPreconditionFailed, map[string]interface{}{ "errorCode": float64(101), @@ -448,7 +448,7 @@ func TestV2CAD(t *testing.T) { map[string]interface{}{ "errorCode": float64(101), "message": "Compare failed", - "cause": "[100 != 4]", + "cause": "[100 != 3]", }, }, { @@ -460,12 +460,12 @@ func TestV2CAD(t *testing.T) { }, }, { - "/v2/keys/foo?prevIndex=4", + "/v2/keys/foo?prevIndex=3", http.StatusOK, map[string]interface{}{ "node": map[string]interface{}{ "key": "/foo", - "modifiedIndex": float64(6), + "modifiedIndex": float64(5), }, "action": "compareAndDelete", }, @@ -493,7 +493,7 @@ func TestV2CAD(t *testing.T) { map[string]interface{}{ "node": map[string]interface{}{ "key": "/foovalue", - "modifiedIndex": float64(7), + "modifiedIndex": float64(6), }, "action": "compareAndDelete", }, @@ -531,7 +531,7 @@ func TestV2Unique(t *testing.T) { http.StatusCreated, map[string]interface{}{ "node": map[string]interface{}{ - "key": "/foo/4", + "key": "/foo/3", "value": "XXX", }, "action": "create", @@ -543,7 +543,7 @@ func TestV2Unique(t *testing.T) { http.StatusCreated, map[string]interface{}{ "node": map[string]interface{}{ - "key": "/foo/5", + "key": "/foo/4", "value": "XXX", }, "action": "create", @@ -555,7 +555,7 @@ func TestV2Unique(t *testing.T) { http.StatusCreated, map[string]interface{}{ "node": map[string]interface{}{ - "key": "/bar/6", + "key": "/bar/5", "value": "XXX", }, "action": "create", @@ -617,8 +617,8 @@ func TestV2Get(t *testing.T) { map[string]interface{}{ "key": "/foo/bar", "dir": true, - "createdIndex": float64(4), - "modifiedIndex": float64(4), + "createdIndex": float64(3), + "modifiedIndex": float64(3), }, }, }, @@ -636,14 +636,14 @@ func TestV2Get(t *testing.T) { map[string]interface{}{ "key": "/foo/bar", "dir": true, - "createdIndex": float64(4), - "modifiedIndex": float64(4), + "createdIndex": float64(3), + "modifiedIndex": float64(3), "nodes": []interface{}{ map[string]interface{}{ "key": "/foo/bar/zar", "value": "XXX", - "createdIndex": float64(4), - "modifiedIndex": float64(4), + "createdIndex": float64(3), + "modifiedIndex": float64(3), }, }, }, @@ -711,8 +711,8 @@ func TestV2QuorumGet(t *testing.T) { map[string]interface{}{ "key": "/foo/bar", "dir": true, - "createdIndex": float64(4), - "modifiedIndex": float64(4), + "createdIndex": float64(3), + "modifiedIndex": float64(3), }, }, }, @@ -730,14 +730,14 @@ func TestV2QuorumGet(t *testing.T) { map[string]interface{}{ "key": "/foo/bar", "dir": true, - "createdIndex": float64(4), - "modifiedIndex": float64(4), + "createdIndex": float64(3), + "modifiedIndex": float64(3), "nodes": []interface{}{ map[string]interface{}{ "key": "/foo/bar/zar", "value": "XXX", - "createdIndex": float64(4), - "modifiedIndex": float64(4), + "createdIndex": float64(3), + "modifiedIndex": float64(3), }, }, }, @@ -797,7 +797,7 @@ func TestV2Watch(t *testing.T) { "node": map[string]interface{}{ "key": "/foo/bar", "value": "XXX", - "modifiedIndex": float64(4), + "modifiedIndex": float64(3), }, "action": "set", } @@ -818,7 +818,7 @@ func TestV2WatchWithIndex(t *testing.T) { var body map[string]interface{} c := make(chan bool, 1) go func() { - resp, _ := tc.Get(fmt.Sprintf("%s%s", u, "/v2/keys/foo/bar?wait=true&waitIndex=5")) + resp, _ := tc.Get(fmt.Sprintf("%s%s", u, "/v2/keys/foo/bar?wait=true&waitIndex=4")) body = tc.ReadBodyJSON(resp) c <- true }() @@ -855,7 +855,7 @@ func TestV2WatchWithIndex(t *testing.T) { "node": map[string]interface{}{ "key": "/foo/bar", "value": "XXX", - "modifiedIndex": float64(5), + "modifiedIndex": float64(4), }, "action": "set", } From ba915ad5a8e0eaf7f5d03387b0a8023bdb484d7a Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 12 Nov 2014 20:37:50 -0800 Subject: [PATCH 26/27] etcdserver: do not add/remove/update local member to/from sender hub --- etcdserver/server.go | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/etcdserver/server.go b/etcdserver/server.go index 1794f92b1..da9ee72ab 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -684,13 +684,21 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) error { log.Panicf("nodeID should always be equal to member ID") } s.Cluster.AddMember(m) - s.sender.Add(m) - log.Printf("etcdserver: added member %s %v to cluster %s", types.ID(cc.NodeID), m.PeerURLs, s.Cluster.ID()) + if m.ID == s.id { + log.Printf("etcdserver: added local member %s %v to cluster %s", m.ID, m.PeerURLs, s.Cluster.ID()) + } else { + s.sender.Add(m) + log.Printf("etcdserver: added member %s %v to cluster %s", m.ID, m.PeerURLs, s.Cluster.ID()) + } case raftpb.ConfChangeRemoveNode: id := types.ID(cc.NodeID) s.Cluster.RemoveMember(id) - s.sender.Remove(id) - log.Printf("etcdserver: removed member %s from cluster %s", id, s.Cluster.ID()) + if id == s.id { + log.Printf("etcdserver: removed local member %s from cluster %s", id, s.Cluster.ID()) + } else { + s.sender.Remove(id) + log.Printf("etcdserver: removed member %s from cluster %s", id, s.Cluster.ID()) + } case raftpb.ConfChangeUpdateNode: m := new(Member) if err := json.Unmarshal(cc.Context, m); err != nil { @@ -700,8 +708,12 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) error { log.Panicf("nodeID should always be equal to member ID") } s.Cluster.UpdateMember(m) - s.sender.Update(m) - log.Printf("etcdserver: update member %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID()) + if m.ID == s.id { + log.Printf("etcdserver: update local member %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID()) + } else { + s.sender.Update(m) + log.Printf("etcdserver: update member %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID()) + } } return nil } From 0d18a0f3813d1bf628dd4f5206386a930c94dc7e Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Thu, 13 Nov 2014 09:11:53 -0800 Subject: [PATCH 27/27] pkg/wait: move wait to pkg/wait --- etcdserver/server.go | 2 +- {wait => pkg/wait}/wait.go | 0 {wait => pkg/wait}/wait_test.go | 0 test | 2 +- 4 files changed, 2 insertions(+), 2 deletions(-) rename {wait => pkg/wait}/wait.go (100%) rename {wait => pkg/wait}/wait_test.go (100%) diff --git a/etcdserver/server.go b/etcdserver/server.go index da9ee72ab..ad7566b5a 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -37,11 +37,11 @@ import ( "github.com/coreos/etcd/etcdserver/stats" "github.com/coreos/etcd/pkg/pbutil" "github.com/coreos/etcd/pkg/types" + "github.com/coreos/etcd/pkg/wait" "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/snap" "github.com/coreos/etcd/store" - "github.com/coreos/etcd/wait" "github.com/coreos/etcd/wal" ) diff --git a/wait/wait.go b/pkg/wait/wait.go similarity index 100% rename from wait/wait.go rename to pkg/wait/wait.go diff --git a/wait/wait_test.go b/pkg/wait/wait_test.go similarity index 100% rename from wait/wait_test.go rename to pkg/wait/wait_test.go diff --git a/test b/test index 9b99c2a68..7fd900684 100755 --- a/test +++ b/test @@ -15,7 +15,7 @@ COVER=${COVER:-"-cover"} source ./build # Hack: gofmt ./ will recursively check the .git directory. So use *.go for gofmt. -TESTABLE_AND_FORMATTABLE="client discovery error etcdctl/command etcdmain etcdserver etcdserver/etcdhttp etcdserver/etcdhttp/httptypes etcdserver/etcdserverpb integration pkg/flags pkg/types pkg/transport proxy raft snap store wait wal" +TESTABLE_AND_FORMATTABLE="client discovery error etcdctl/command etcdmain etcdserver etcdserver/etcdhttp etcdserver/etcdhttp/httptypes etcdserver/etcdserverpb integration pkg/flags pkg/types pkg/transport pkg/wait proxy raft snap store wal" FORMATTABLE="$TESTABLE_AND_FORMATTABLE *.go etcdctl/" # user has not provided PKG override