From bf2289ae008e58927bf89acd94b41a734db1013a Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Thu, 7 Apr 2016 12:02:37 -0700 Subject: [PATCH] etcdserver: move membership related code to membership pkg --- e2e/etcd_test.go | 2 +- etcdserver/api/cluster.go | 6 +- etcdserver/api/v2http/client.go | 21 ++-- etcdserver/api/v2http/client_test.go | 57 ++++----- etcdserver/api/v2http/http.go | 1 + etcdserver/api/v2http/http_test.go | 15 +-- etcdserver/api/v2http/peer_test.go | 10 +- etcdserver/api/v3rpc/member.go | 19 +-- etcdserver/cluster_util.go | 28 ++--- etcdserver/errors.go | 11 -- etcdserver/{ => membership}/cluster.go | 133 +++++++++++--------- etcdserver/{ => membership}/cluster_test.go | 20 +-- etcdserver/membership/errors.go | 33 +++++ etcdserver/{ => membership}/member.go | 35 ++++-- etcdserver/{ => membership}/member_test.go | 2 +- etcdserver/raft.go | 15 +-- etcdserver/raft_test.go | 5 +- etcdserver/server.go | 47 +++---- etcdserver/server_test.go | 103 ++++++++------- etcdserver/util.go | 3 +- pkg/types/urls.go | 8 ++ 21 files changed, 319 insertions(+), 255 deletions(-) rename etcdserver/{ => membership}/cluster.go (73%) rename etcdserver/{ => membership}/cluster_test.go (97%) create mode 100644 etcdserver/membership/errors.go rename etcdserver/{ => membership}/member.go (82%) rename etcdserver/{ => membership}/member_test.go (99%) diff --git a/e2e/etcd_test.go b/e2e/etcd_test.go index d49fe7071..47421be89 100644 --- a/e2e/etcd_test.go +++ b/e2e/etcd_test.go @@ -252,7 +252,7 @@ func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster, // wait for cluster to start readyC := make(chan error, cfg.clusterSize+cfg.proxySize) - readyStr := "etcdserver: set the initial cluster version to" + readyStr := "membership: set the initial cluster version to" for i := range etcdCfgs { go func(etcdp *etcdProcess) { rs := readyStr diff --git a/etcdserver/api/cluster.go b/etcdserver/api/cluster.go index a50c601bb..c21eccba3 100644 --- a/etcdserver/api/cluster.go +++ b/etcdserver/api/cluster.go @@ -15,7 +15,7 @@ package api import ( - "github.com/coreos/etcd/etcdserver" + "github.com/coreos/etcd/etcdserver/membership" "github.com/coreos/etcd/pkg/types" "github.com/coreos/go-semver/semver" @@ -29,10 +29,10 @@ type Cluster interface { // cluster is listening for client requests ClientURLs() []string // Members returns a slice of members sorted by their ID - Members() []*etcdserver.Member + Members() []*membership.Member // Member retrieves a particular member based on ID, or nil if the // member does not exist in the cluster - Member(id types.ID) *etcdserver.Member + Member(id types.ID) *membership.Member // IsIDRemoved checks whether the given ID has been removed from this // cluster at some point in the past IsIDRemoved(id types.ID) bool diff --git a/etcdserver/api/v2http/client.go b/etcdserver/api/v2http/client.go index 704bb6f93..9f6e3b5e4 100644 --- a/etcdserver/api/v2http/client.go +++ b/etcdserver/api/v2http/client.go @@ -34,6 +34,7 @@ import ( "github.com/coreos/etcd/etcdserver/api/v2http/httptypes" "github.com/coreos/etcd/etcdserver/auth" "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/etcdserver/membership" "github.com/coreos/etcd/etcdserver/stats" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft" @@ -248,10 +249,10 @@ func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } now := h.clock.Now() - m := etcdserver.NewMember("", req.PeerURLs, "", &now) + m := membership.NewMember("", req.PeerURLs, "", &now) err := h.server.AddMember(ctx, *m) switch { - case err == etcdserver.ErrIDExists || err == etcdserver.ErrPeerURLexists: + case err == membership.ErrIDExists || err == membership.ErrPeerURLexists: writeError(w, r, httptypes.NewHTTPError(http.StatusConflict, err.Error())) return case err != nil: @@ -272,9 +273,9 @@ func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } err := h.server.RemoveMember(ctx, uint64(id)) switch { - case err == etcdserver.ErrIDRemoved: + case err == membership.ErrIDRemoved: writeError(w, r, httptypes.NewHTTPError(http.StatusGone, fmt.Sprintf("Member permanently removed: %s", id))) - case err == etcdserver.ErrIDNotFound: + case err == membership.ErrIDNotFound: writeError(w, r, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", id))) case err != nil: plog.Errorf("error removing member %s (%v)", id, err) @@ -291,15 +292,15 @@ func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if ok := unmarshalRequest(r, &req, w); !ok { return } - m := etcdserver.Member{ + m := membership.Member{ ID: id, - RaftAttributes: etcdserver.RaftAttributes{PeerURLs: req.PeerURLs.StringSlice()}, + RaftAttributes: membership.RaftAttributes{PeerURLs: req.PeerURLs.StringSlice()}, } err := h.server.UpdateMember(ctx, m) switch { - case err == etcdserver.ErrPeerURLexists: + case err == membership.ErrPeerURLexists: writeError(w, r, httptypes.NewHTTPError(http.StatusConflict, err.Error())) - case err == etcdserver.ErrIDNotFound: + case err == membership.ErrIDNotFound: writeError(w, r, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", id))) case err != nil: plog.Errorf("error updating member %s (%v)", m.ID, err) @@ -804,7 +805,7 @@ func trimPrefix(p, prefix string) (s string) { return } -func newMemberCollection(ms []*etcdserver.Member) *httptypes.MemberCollection { +func newMemberCollection(ms []*membership.Member) *httptypes.MemberCollection { c := httptypes.MemberCollection(make([]httptypes.Member, len(ms))) for i, m := range ms { @@ -814,7 +815,7 @@ func newMemberCollection(ms []*etcdserver.Member) *httptypes.MemberCollection { return &c } -func newMember(m *etcdserver.Member) httptypes.Member { +func newMember(m *membership.Member) httptypes.Member { tm := httptypes.Member{ ID: m.ID.String(), Name: m.Name, diff --git a/etcdserver/api/v2http/client_test.go b/etcdserver/api/v2http/client_test.go index bc5b933f2..b71eb25ef 100644 --- a/etcdserver/api/v2http/client_test.go +++ b/etcdserver/api/v2http/client_test.go @@ -32,6 +32,7 @@ import ( "github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver/api/v2http/httptypes" "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/etcdserver/membership" "github.com/coreos/etcd/pkg/testutil" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft/raftpb" @@ -103,7 +104,7 @@ func (s *serverRecorder) Process(_ context.Context, m raftpb.Message) error { s.actions = append(s.actions, action{name: "Process", params: []interface{}{m}}) return nil } -func (s *serverRecorder) AddMember(_ context.Context, m etcdserver.Member) error { +func (s *serverRecorder) AddMember(_ context.Context, m membership.Member) error { s.actions = append(s.actions, action{name: "AddMember", params: []interface{}{m}}) return nil } @@ -112,7 +113,7 @@ func (s *serverRecorder) RemoveMember(_ context.Context, id uint64) error { return nil } -func (s *serverRecorder) UpdateMember(_ context.Context, m etcdserver.Member) error { +func (s *serverRecorder) UpdateMember(_ context.Context, m membership.Member) error { s.actions = append(s.actions, action{name: "UpdateMember", params: []interface{}{m}}) return nil } @@ -149,9 +150,9 @@ func (rs *resServer) Do(_ context.Context, _ etcdserverpb.Request) (etcdserver.R return rs.res, nil } func (rs *resServer) Process(_ context.Context, _ raftpb.Message) error { return nil } -func (rs *resServer) AddMember(_ context.Context, _ etcdserver.Member) error { return nil } +func (rs *resServer) AddMember(_ context.Context, _ membership.Member) error { return nil } func (rs *resServer) RemoveMember(_ context.Context, _ uint64) error { return nil } -func (rs *resServer) UpdateMember(_ context.Context, _ etcdserver.Member) error { return nil } +func (rs *resServer) UpdateMember(_ context.Context, _ membership.Member) error { return nil } func (rs *resServer) ClusterVersion() *semver.Version { return nil } func boolp(b bool) *bool { return &b } @@ -600,11 +601,11 @@ func TestGoodParseRequest(t *testing.T) { } func TestServeMembers(t *testing.T) { - memb1 := etcdserver.Member{ID: 12, Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8080"}}} - memb2 := etcdserver.Member{ID: 13, Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8081"}}} + memb1 := membership.Member{ID: 12, Attributes: membership.Attributes{ClientURLs: []string{"http://localhost:8080"}}} + memb2 := membership.Member{ID: 13, Attributes: membership.Attributes{ClientURLs: []string{"http://localhost:8081"}}} cluster := &fakeCluster{ id: 1, - members: map[uint64]*etcdserver.Member{1: &memb1, 2: &memb2}, + members: map[uint64]*membership.Member{1: &memb1, 2: &memb2}, } h := &membersHandler{ server: &serverRecorder{}, @@ -653,11 +654,11 @@ func TestServeMembers(t *testing.T) { // TODO: consolidate **ALL** fake server implementations and add no leader test case. func TestServeLeader(t *testing.T) { - memb1 := etcdserver.Member{ID: 1, Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8080"}}} - memb2 := etcdserver.Member{ID: 2, Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8081"}}} + memb1 := membership.Member{ID: 1, Attributes: membership.Attributes{ClientURLs: []string{"http://localhost:8080"}}} + memb2 := membership.Member{ID: 2, Attributes: membership.Attributes{ClientURLs: []string{"http://localhost:8081"}}} cluster := &fakeCluster{ id: 1, - members: map[uint64]*etcdserver.Member{1: &memb1, 2: &memb2}, + members: map[uint64]*membership.Member{1: &memb1, 2: &memb2}, } h := &membersHandler{ server: &serverRecorder{}, @@ -741,9 +742,9 @@ func TestServeMembersCreate(t *testing.T) { t.Errorf("got body=%q, want %q", g, wb) } - wm := etcdserver.Member{ + wm := membership.Member{ ID: 3064321551348478165, - RaftAttributes: etcdserver.RaftAttributes{ + RaftAttributes: membership.RaftAttributes{ PeerURLs: []string{"http://127.0.0.1:1"}, }, } @@ -816,9 +817,9 @@ func TestServeMembersUpdate(t *testing.T) { t.Errorf("cid = %s, want %s", gcid, wcid) } - wm := etcdserver.Member{ + wm := membership.Member{ ID: 1, - RaftAttributes: etcdserver.RaftAttributes{ + RaftAttributes: membership.RaftAttributes{ PeerURLs: []string{"http://127.0.0.1:1"}, }, } @@ -913,7 +914,7 @@ func TestServeMembersFail(t *testing.T) { Header: map[string][]string{"Content-Type": {"application/json"}}, }, &errServer{ - etcdserver.ErrIDExists, + membership.ErrIDExists, }, http.StatusConflict, @@ -927,7 +928,7 @@ func TestServeMembersFail(t *testing.T) { Header: map[string][]string{"Content-Type": {"application/json"}}, }, &errServer{ - etcdserver.ErrPeerURLexists, + membership.ErrPeerURLexists, }, http.StatusConflict, @@ -951,7 +952,7 @@ func TestServeMembersFail(t *testing.T) { Method: "DELETE", }, &errServer{ - etcdserver.ErrIDRemoved, + membership.ErrIDRemoved, }, http.StatusGone, @@ -963,7 +964,7 @@ func TestServeMembersFail(t *testing.T) { Method: "DELETE", }, &errServer{ - etcdserver.ErrIDNotFound, + membership.ErrIDNotFound, }, http.StatusNotFound, @@ -1047,7 +1048,7 @@ func TestServeMembersFail(t *testing.T) { Header: map[string][]string{"Content-Type": {"application/json"}}, }, &errServer{ - etcdserver.ErrPeerURLexists, + membership.ErrPeerURLexists, }, http.StatusConflict, @@ -1061,7 +1062,7 @@ func TestServeMembersFail(t *testing.T) { Header: map[string][]string{"Content-Type": {"application/json"}}, }, &errServer{ - etcdserver.ErrIDNotFound, + membership.ErrIDNotFound, }, http.StatusNotFound, @@ -1963,16 +1964,16 @@ func TestTrimPrefix(t *testing.T) { } func TestNewMemberCollection(t *testing.T) { - fixture := []*etcdserver.Member{ + fixture := []*membership.Member{ { ID: 12, - Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8080", "http://localhost:8081"}}, - RaftAttributes: etcdserver.RaftAttributes{PeerURLs: []string{"http://localhost:8082", "http://localhost:8083"}}, + Attributes: membership.Attributes{ClientURLs: []string{"http://localhost:8080", "http://localhost:8081"}}, + RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"http://localhost:8082", "http://localhost:8083"}}, }, { ID: 13, - Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:9090", "http://localhost:9091"}}, - RaftAttributes: etcdserver.RaftAttributes{PeerURLs: []string{"http://localhost:9092", "http://localhost:9093"}}, + Attributes: membership.Attributes{ClientURLs: []string{"http://localhost:9090", "http://localhost:9091"}}, + RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"http://localhost:9092", "http://localhost:9093"}}, }, } got := newMemberCollection(fixture) @@ -1996,10 +1997,10 @@ func TestNewMemberCollection(t *testing.T) { } func TestNewMember(t *testing.T) { - fixture := &etcdserver.Member{ + fixture := &membership.Member{ ID: 12, - Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8080", "http://localhost:8081"}}, - RaftAttributes: etcdserver.RaftAttributes{PeerURLs: []string{"http://localhost:8082", "http://localhost:8083"}}, + Attributes: membership.Attributes{ClientURLs: []string{"http://localhost:8080", "http://localhost:8081"}}, + RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"http://localhost:8082", "http://localhost:8083"}}, } got := newMember(fixture) diff --git a/etcdserver/api/v2http/http.go b/etcdserver/api/v2http/http.go index d4a5eab7a..ba44d9514 100644 --- a/etcdserver/api/v2http/http.go +++ b/etcdserver/api/v2http/http.go @@ -23,6 +23,7 @@ import ( etcdErr "github.com/coreos/etcd/error" "github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver/api/v2http/httptypes" + "github.com/coreos/etcd/etcdserver/auth" "github.com/coreos/etcd/pkg/logutil" "github.com/coreos/pkg/capnslog" diff --git a/etcdserver/api/v2http/http_test.go b/etcdserver/api/v2http/http_test.go index 21715e6a0..2b32a5b09 100644 --- a/etcdserver/api/v2http/http_test.go +++ b/etcdserver/api/v2http/http_test.go @@ -24,6 +24,7 @@ import ( etcdErr "github.com/coreos/etcd/error" "github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/etcdserver/membership" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/go-semver/semver" @@ -33,20 +34,20 @@ import ( type fakeCluster struct { id uint64 clientURLs []string - members map[uint64]*etcdserver.Member + members map[uint64]*membership.Member } func (c *fakeCluster) ID() types.ID { return types.ID(c.id) } func (c *fakeCluster) ClientURLs() []string { return c.clientURLs } -func (c *fakeCluster) Members() []*etcdserver.Member { - var ms etcdserver.MembersByID +func (c *fakeCluster) Members() []*membership.Member { + var ms membership.MembersByID for _, m := range c.members { ms = append(ms, m) } sort.Sort(ms) - return []*etcdserver.Member(ms) + return []*membership.Member(ms) } -func (c *fakeCluster) Member(id types.ID) *etcdserver.Member { return c.members[uint64(id)] } +func (c *fakeCluster) Member(id types.ID) *membership.Member { return c.members[uint64(id)] } func (c *fakeCluster) IsIDRemoved(id types.ID) bool { return false } func (c *fakeCluster) Version() *semver.Version { return nil } @@ -66,13 +67,13 @@ func (fs *errServer) Do(ctx context.Context, r etcdserverpb.Request) (etcdserver func (fs *errServer) Process(ctx context.Context, m raftpb.Message) error { return fs.err } -func (fs *errServer) AddMember(ctx context.Context, m etcdserver.Member) error { +func (fs *errServer) AddMember(ctx context.Context, m membership.Member) error { return fs.err } func (fs *errServer) RemoveMember(ctx context.Context, id uint64) error { return fs.err } -func (fs *errServer) UpdateMember(ctx context.Context, m etcdserver.Member) error { +func (fs *errServer) UpdateMember(ctx context.Context, m membership.Member) error { return fs.err } diff --git a/etcdserver/api/v2http/peer_test.go b/etcdserver/api/v2http/peer_test.go index 874f283ba..8ae6083e2 100644 --- a/etcdserver/api/v2http/peer_test.go +++ b/etcdserver/api/v2http/peer_test.go @@ -22,7 +22,7 @@ import ( "path" "testing" - "github.com/coreos/etcd/etcdserver" + "github.com/coreos/etcd/etcdserver/membership" "github.com/coreos/etcd/pkg/testutil" "github.com/coreos/etcd/rafthttp" ) @@ -85,14 +85,14 @@ func TestServeMembersFails(t *testing.T) { } func TestServeMembersGet(t *testing.T) { - memb1 := etcdserver.Member{ID: 1, Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8080"}}} - memb2 := etcdserver.Member{ID: 2, Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8081"}}} + memb1 := membership.Member{ID: 1, Attributes: membership.Attributes{ClientURLs: []string{"http://localhost:8080"}}} + memb2 := membership.Member{ID: 2, Attributes: membership.Attributes{ClientURLs: []string{"http://localhost:8081"}}} cluster := &fakeCluster{ id: 1, - members: map[uint64]*etcdserver.Member{1: &memb1, 2: &memb2}, + members: map[uint64]*membership.Member{1: &memb1, 2: &memb2}, } h := &peerMembersHandler{cluster: cluster} - msb, err := json.Marshal([]etcdserver.Member{memb1, memb2}) + msb, err := json.Marshal([]membership.Member{memb1, memb2}) if err != nil { t.Fatal(err) } diff --git a/etcdserver/api/v3rpc/member.go b/etcdserver/api/v3rpc/member.go index 8b5d64f47..351a998c2 100644 --- a/etcdserver/api/v3rpc/member.go +++ b/etcdserver/api/v3rpc/member.go @@ -21,6 +21,7 @@ import ( "github.com/coreos/etcd/etcdserver/api" "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/etcdserver/membership" "github.com/coreos/etcd/pkg/types" "golang.org/x/net/context" "google.golang.org/grpc" @@ -48,12 +49,12 @@ func (cs *ClusterServer) MemberAdd(ctx context.Context, r *pb.MemberAddRequest) } now := time.Now() - m := etcdserver.NewMember("", urls, "", &now) + m := membership.NewMember("", urls, "", &now) err = cs.server.AddMember(ctx, *m) switch { - case err == etcdserver.ErrIDExists: + case err == membership.ErrIDExists: return nil, rpctypes.ErrMemberExist - case err == etcdserver.ErrPeerURLexists: + case err == membership.ErrPeerURLexists: return nil, rpctypes.ErrPeerURLExist case err != nil: return nil, grpc.Errorf(codes.Internal, err.Error()) @@ -68,9 +69,9 @@ func (cs *ClusterServer) MemberAdd(ctx context.Context, r *pb.MemberAddRequest) func (cs *ClusterServer) MemberRemove(ctx context.Context, r *pb.MemberRemoveRequest) (*pb.MemberRemoveResponse, error) { err := cs.server.RemoveMember(ctx, r.ID) switch { - case err == etcdserver.ErrIDRemoved: + case err == membership.ErrIDRemoved: fallthrough - case err == etcdserver.ErrIDNotFound: + case err == membership.ErrIDNotFound: return nil, rpctypes.ErrMemberNotFound case err != nil: return nil, grpc.Errorf(codes.Internal, err.Error()) @@ -80,15 +81,15 @@ func (cs *ClusterServer) MemberRemove(ctx context.Context, r *pb.MemberRemoveReq } func (cs *ClusterServer) MemberUpdate(ctx context.Context, r *pb.MemberUpdateRequest) (*pb.MemberUpdateResponse, error) { - m := etcdserver.Member{ + m := membership.Member{ ID: types.ID(r.ID), - RaftAttributes: etcdserver.RaftAttributes{PeerURLs: r.PeerURLs}, + RaftAttributes: membership.RaftAttributes{PeerURLs: r.PeerURLs}, } err := cs.server.UpdateMember(ctx, m) switch { - case err == etcdserver.ErrPeerURLexists: + case err == membership.ErrPeerURLexists: return nil, rpctypes.ErrPeerURLExist - case err == etcdserver.ErrIDNotFound: + case err == membership.ErrIDNotFound: return nil, rpctypes.ErrMemberNotFound case err != nil: return nil, grpc.Errorf(codes.Internal, err.Error()) diff --git a/etcdserver/cluster_util.go b/etcdserver/cluster_util.go index ead1913cb..cd456f92e 100644 --- a/etcdserver/cluster_util.go +++ b/etcdserver/cluster_util.go @@ -22,6 +22,7 @@ import ( "sort" "time" + "github.com/coreos/etcd/etcdserver/membership" "github.com/coreos/etcd/pkg/httputil" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/version" @@ -30,7 +31,7 @@ import ( // isMemberBootstrapped tries to check if the given member has been bootstrapped // in the given cluster. -func isMemberBootstrapped(cl *cluster, member string, rt http.RoundTripper, timeout time.Duration) bool { +func isMemberBootstrapped(cl *membership.RaftCluster, member string, rt http.RoundTripper, timeout time.Duration) bool { rcl, err := getClusterFromRemotePeers(getRemotePeerURLs(cl, member), timeout, false, rt) if err != nil { return false @@ -53,12 +54,12 @@ func isMemberBootstrapped(cl *cluster, member string, rt http.RoundTripper, time // response, an error is returned. // Each request has a 10-second timeout. Because the upper limit of TTL is 5s, // 10 second is enough for building connection and finishing request. -func GetClusterFromRemotePeers(urls []string, rt http.RoundTripper) (*cluster, error) { +func GetClusterFromRemotePeers(urls []string, rt http.RoundTripper) (*membership.RaftCluster, error) { return getClusterFromRemotePeers(urls, 10*time.Second, true, rt) } // If logerr is true, it prints out more error messages. -func getClusterFromRemotePeers(urls []string, timeout time.Duration, logerr bool, rt http.RoundTripper) (*cluster, error) { +func getClusterFromRemotePeers(urls []string, timeout time.Duration, logerr bool, rt http.RoundTripper) (*membership.RaftCluster, error) { cc := &http.Client{ Transport: rt, Timeout: timeout, @@ -78,7 +79,7 @@ func getClusterFromRemotePeers(urls []string, timeout time.Duration, logerr bool } continue } - var membs []*Member + var membs []*membership.Member if err = json.Unmarshal(b, &membs); err != nil { if logerr { plog.Warningf("could not unmarshal cluster response: %v", err) @@ -92,14 +93,14 @@ func getClusterFromRemotePeers(urls []string, timeout time.Duration, logerr bool } continue } - return newClusterFromMembers("", id, membs), nil + return membership.NewClusterFromMembers("", id, membs), nil } return nil, fmt.Errorf("could not retrieve cluster information from the given urls") } // getRemotePeerURLs returns peer urls of remote members in the cluster. The // returned list is sorted in ascending lexicographical order. -func getRemotePeerURLs(cl *cluster, local string) []string { +func getRemotePeerURLs(cl *membership.RaftCluster, local string) []string { us := make([]string, 0) for _, m := range cl.Members() { if m.Name == local { @@ -115,7 +116,7 @@ func getRemotePeerURLs(cl *cluster, local string) []string { // The key of the returned map is the member's ID. The value of the returned map // is the semver versions string, including server and cluster. // If it fails to get the version of a member, the key will be nil. -func getVersions(cl *cluster, local types.ID, rt http.RoundTripper) map[string]*version.Versions { +func getVersions(cl *membership.RaftCluster, local types.ID, rt http.RoundTripper) map[string]*version.Versions { members := cl.Members() vers := make(map[string]*version.Versions) for _, m := range members { @@ -173,7 +174,7 @@ func decideClusterVersion(vers map[string]*version.Versions) *semver.Version { // cluster version in the range of [MinClusterVersion, Version] and no known members has a cluster version // out of the range. // We set this rule since when the local member joins, another member might be offline. -func isCompatibleWithCluster(cl *cluster, local types.ID, rt http.RoundTripper) bool { +func isCompatibleWithCluster(cl *membership.RaftCluster, local types.ID, rt http.RoundTripper) bool { vers := getVersions(cl, local, rt) minV := semver.Must(semver.NewVersion(version.MinClusterVersion)) maxV := semver.Must(semver.NewVersion(version.Version)) @@ -215,7 +216,7 @@ func isCompatibleWithVers(vers map[string]*version.Versions, local types.ID, min // getVersion returns the Versions of the given member via its // peerURLs. Returns the last error if it fails to get the version. -func getVersion(m *Member, rt http.RoundTripper) (*version.Versions, error) { +func getVersion(m *membership.Member, rt http.RoundTripper) (*version.Versions, error) { cc := &http.Client{ Transport: rt, } @@ -255,12 +256,3 @@ func getVersion(m *Member, rt http.RoundTripper) (*version.Versions, error) { } return nil, err } - -func MustDetectDowngrade(cv *semver.Version) { - lv := semver.Must(semver.NewVersion(version.Version)) - // only keep major.minor version for comparison against cluster version - lv = &semver.Version{Major: lv.Major, Minor: lv.Minor} - if cv != nil && lv.LessThan(*cv) { - plog.Fatalf("cluster cannot be downgraded (current version: %s is lower than determined cluster version: %s).", version.Version, version.Cluster(cv.String())) - } -} diff --git a/etcdserver/errors.go b/etcdserver/errors.go index 07657f011..6731c117e 100644 --- a/etcdserver/errors.go +++ b/etcdserver/errors.go @@ -17,17 +17,11 @@ package etcdserver import ( "errors" "fmt" - - etcdErr "github.com/coreos/etcd/error" ) var ( ErrUnknownMethod = errors.New("etcdserver: unknown method") ErrStopped = errors.New("etcdserver: server stopped") - ErrIDRemoved = errors.New("etcdserver: ID removed") - ErrIDExists = errors.New("etcdserver: ID exists") - ErrIDNotFound = errors.New("etcdserver: ID not found") - ErrPeerURLexists = errors.New("etcdserver: peerURL exists") ErrCanceled = errors.New("etcdserver: request cancelled") ErrTimeout = errors.New("etcdserver: request timed out") ErrTimeoutDueToLeaderFail = errors.New("etcdserver: request timed out, possibly due to previous leader failure") @@ -38,11 +32,6 @@ var ( ErrNoSpace = errors.New("etcdserver: no space") ) -func isKeyNotFound(err error) bool { - e, ok := err.(*etcdErr.Error) - return ok && e.ErrorCode == etcdErr.EcodeKeyNotFound -} - type DiscoveryError struct { Op string Err error diff --git a/etcdserver/cluster.go b/etcdserver/membership/cluster.go similarity index 73% rename from etcdserver/cluster.go rename to etcdserver/membership/cluster.go index 2365237fb..7a6108454 100644 --- a/etcdserver/cluster.go +++ b/etcdserver/membership/cluster.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package etcdserver +package membership import ( "bytes" @@ -34,13 +34,8 @@ import ( "github.com/coreos/go-semver/semver" ) -const ( - raftAttributesSuffix = "raftAttributes" - attributesSuffix = "attributes" -) - -// Cluster is a list of Members that belong to the same raft cluster -type cluster struct { +// RaftCluster is a list of Members that belong to the same raft cluster +type RaftCluster struct { id types.ID token string store store.Store @@ -53,8 +48,8 @@ type cluster struct { removed map[types.ID]bool } -func newClusterFromURLsMap(token string, urlsmap types.URLsMap) (*cluster, error) { - c := newCluster(token) +func NewClusterFromURLsMap(token string, urlsmap types.URLsMap) (*RaftCluster, error) { + c := NewCluster(token) for name, urls := range urlsmap { m := NewMember(name, urls, token, nil) if _, ok := c.members[m.ID]; ok { @@ -69,8 +64,8 @@ func newClusterFromURLsMap(token string, urlsmap types.URLsMap) (*cluster, error return c, nil } -func newClusterFromMembers(token string, id types.ID, membs []*Member) *cluster { - c := newCluster(token) +func NewClusterFromMembers(token string, id types.ID, membs []*Member) *RaftCluster { + c := NewCluster(token) c.id = id for _, m := range membs { c.members[m.ID] = m @@ -78,17 +73,17 @@ func newClusterFromMembers(token string, id types.ID, membs []*Member) *cluster return c } -func newCluster(token string) *cluster { - return &cluster{ +func NewCluster(token string) *RaftCluster { + return &RaftCluster{ token: token, members: make(map[types.ID]*Member), removed: make(map[types.ID]bool), } } -func (c *cluster) ID() types.ID { return c.id } +func (c *RaftCluster) ID() types.ID { return c.id } -func (c *cluster) Members() []*Member { +func (c *RaftCluster) Members() []*Member { c.Lock() defer c.Unlock() var ms MembersByID @@ -99,7 +94,7 @@ func (c *cluster) Members() []*Member { return []*Member(ms) } -func (c *cluster) Member(id types.ID) *Member { +func (c *RaftCluster) Member(id types.ID) *Member { c.Lock() defer c.Unlock() return c.members[id].Clone() @@ -107,7 +102,7 @@ func (c *cluster) Member(id types.ID) *Member { // MemberByName returns a Member with the given name if exists. // If more than one member has the given name, it will panic. -func (c *cluster) MemberByName(name string) *Member { +func (c *RaftCluster) MemberByName(name string) *Member { c.Lock() defer c.Unlock() var memb *Member @@ -122,7 +117,7 @@ func (c *cluster) MemberByName(name string) *Member { return memb.Clone() } -func (c *cluster) MemberIDs() []types.ID { +func (c *RaftCluster) MemberIDs() []types.ID { c.Lock() defer c.Unlock() var ids []types.ID @@ -133,7 +128,7 @@ func (c *cluster) MemberIDs() []types.ID { return ids } -func (c *cluster) IsIDRemoved(id types.ID) bool { +func (c *RaftCluster) IsIDRemoved(id types.ID) bool { c.Lock() defer c.Unlock() return c.removed[id] @@ -141,7 +136,7 @@ func (c *cluster) IsIDRemoved(id types.ID) bool { // PeerURLs returns a list of all peer addresses. // The returned list is sorted in ascending lexicographical order. -func (c *cluster) PeerURLs() []string { +func (c *RaftCluster) PeerURLs() []string { c.Lock() defer c.Unlock() urls := make([]string, 0) @@ -156,7 +151,7 @@ func (c *cluster) PeerURLs() []string { // ClientURLs returns a list of all client addresses. // The returned list is sorted in ascending lexicographical order. -func (c *cluster) ClientURLs() []string { +func (c *RaftCluster) ClientURLs() []string { c.Lock() defer c.Unlock() urls := make([]string, 0) @@ -169,7 +164,7 @@ func (c *cluster) ClientURLs() []string { return urls } -func (c *cluster) String() string { +func (c *RaftCluster) String() string { c.Lock() defer c.Unlock() b := &bytes.Buffer{} @@ -187,7 +182,7 @@ func (c *cluster) String() string { return b.String() } -func (c *cluster) genID() { +func (c *RaftCluster) genID() { mIDs := c.MemberIDs() b := make([]byte, 8*len(mIDs)) for i, id := range mIDs { @@ -197,17 +192,17 @@ func (c *cluster) genID() { c.id = types.ID(binary.BigEndian.Uint64(hash[:8])) } -func (c *cluster) SetID(id types.ID) { c.id = id } +func (c *RaftCluster) SetID(id types.ID) { c.id = id } -func (c *cluster) SetStore(st store.Store) { c.store = st } +func (c *RaftCluster) SetStore(st store.Store) { c.store = st } -func (c *cluster) Recover() { +func (c *RaftCluster) Recover() { c.Lock() defer c.Unlock() c.members, c.removed = membersFromStore(c.store) c.version = clusterVersionFromStore(c.store) - MustDetectDowngrade(c.version) + mustDetectDowngrade(c.version) for _, m := range c.members { plog.Infof("added member %s %v to cluster %s from store", m.ID, m.PeerURLs, c.id) @@ -219,7 +214,7 @@ func (c *cluster) Recover() { // ValidateConfigurationChange takes a proposed ConfChange and // ensures that it is still valid. -func (c *cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error { +func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error { members, removed := membersFromStore(c.store) id := types.ID(cc.NodeID) if removed[id] { @@ -280,36 +275,42 @@ func (c *cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error { // AddMember adds a new Member into the cluster, and saves the given member's // raftAttributes into the store. The given member should have empty attributes. // A Member with a matching id must not exist. -func (c *cluster) AddMember(m *Member) { +func (c *RaftCluster) AddMember(m *Member) { c.Lock() defer c.Unlock() - b, err := json.Marshal(m.RaftAttributes) - if err != nil { - plog.Panicf("marshal raftAttributes should never fail: %v", err) - } - p := path.Join(memberStoreKey(m.ID), raftAttributesSuffix) - if _, err := c.store.Create(p, false, string(b), false, store.TTLOptionSet{ExpireTime: store.Permanent}); err != nil { - plog.Panicf("create raftAttributes should never fail: %v", err) + if c.store != nil { + b, err := json.Marshal(m.RaftAttributes) + if err != nil { + plog.Panicf("marshal raftAttributes should never fail: %v", err) + } + p := path.Join(MemberStoreKey(m.ID), raftAttributesSuffix) + if _, err := c.store.Create(p, false, string(b), false, store.TTLOptionSet{ExpireTime: store.Permanent}); err != nil { + plog.Panicf("create raftAttributes should never fail: %v", err) + } } c.members[m.ID] = m } // RemoveMember removes a member from the store. // The given id MUST exist, or the function panics. -func (c *cluster) RemoveMember(id types.ID) { +func (c *RaftCluster) RemoveMember(id types.ID) { c.Lock() defer c.Unlock() - if _, err := c.store.Delete(memberStoreKey(id), true, true); err != nil { - plog.Panicf("delete member should never fail: %v", err) + if c.store != nil { + if _, err := c.store.Delete(MemberStoreKey(id), true, true); err != nil { + plog.Panicf("delete member should never fail: %v", err) + } } delete(c.members, id) - if _, err := c.store.Create(removedMemberStoreKey(id), false, "", false, store.TTLOptionSet{ExpireTime: store.Permanent}); err != nil { - plog.Panicf("create removedMember should never fail: %v", err) + if c.store != nil { + if _, err := c.store.Create(RemovedMemberStoreKey(id), false, "", false, store.TTLOptionSet{ExpireTime: store.Permanent}); err != nil { + plog.Panicf("create removedMember should never fail: %v", err) + } } c.removed[id] = true } -func (c *cluster) UpdateAttributes(id types.ID, attr Attributes) bool { +func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes) bool { c.Lock() defer c.Unlock() if m, ok := c.members[id]; ok { @@ -326,21 +327,24 @@ func (c *cluster) UpdateAttributes(id types.ID, attr Attributes) bool { return false } -func (c *cluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) { +func (c *RaftCluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) { c.Lock() defer c.Unlock() - b, err := json.Marshal(raftAttr) - if err != nil { - plog.Panicf("marshal raftAttributes should never fail: %v", err) - } - p := path.Join(memberStoreKey(id), raftAttributesSuffix) - if _, err := c.store.Update(p, string(b), store.TTLOptionSet{ExpireTime: store.Permanent}); err != nil { - plog.Panicf("update raftAttributes should never fail: %v", err) + + if c.store != nil { + b, err := json.Marshal(raftAttr) + if err != nil { + plog.Panicf("marshal raftAttributes should never fail: %v", err) + } + p := path.Join(MemberStoreKey(id), raftAttributesSuffix) + if _, err := c.store.Update(p, string(b), store.TTLOptionSet{ExpireTime: store.Permanent}); err != nil { + plog.Panicf("update raftAttributes should never fail: %v", err) + } } c.members[id].RaftAttributes = raftAttr } -func (c *cluster) Version() *semver.Version { +func (c *RaftCluster) Version() *semver.Version { c.Lock() defer c.Unlock() if c.version == nil { @@ -349,7 +353,7 @@ func (c *cluster) Version() *semver.Version { return semver.Must(semver.NewVersion(c.version.String())) } -func (c *cluster) SetVersion(ver *semver.Version) { +func (c *RaftCluster) SetVersion(ver *semver.Version) { c.Lock() defer c.Unlock() if c.version != nil { @@ -358,10 +362,10 @@ func (c *cluster) SetVersion(ver *semver.Version) { plog.Noticef("set the initial cluster version to %v", version.Cluster(ver.String())) } c.version = ver - MustDetectDowngrade(c.version) + mustDetectDowngrade(c.version) } -func (c *cluster) isReadyToAddNewMember() bool { +func (c *RaftCluster) IsReadyToAddNewMember() bool { nmembers := 1 nstarted := 0 @@ -389,7 +393,7 @@ func (c *cluster) isReadyToAddNewMember() bool { return true } -func (c *cluster) isReadyToRemoveMember(id uint64) bool { +func (c *RaftCluster) IsReadyToRemoveMember(id uint64) bool { nmembers := 0 nstarted := 0 @@ -416,7 +420,7 @@ func (c *cluster) isReadyToRemoveMember(id uint64) bool { func membersFromStore(st store.Store) (map[types.ID]*Member, map[types.ID]bool) { members := make(map[types.ID]*Member) removed := make(map[types.ID]bool) - e, err := st.Get(storeMembersPrefix, true, true) + e, err := st.Get(StoreMembersPrefix, true, true) if err != nil { if isKeyNotFound(err) { return members, removed @@ -440,13 +444,13 @@ func membersFromStore(st store.Store) (map[types.ID]*Member, map[types.ID]bool) plog.Panicf("get storeRemovedMembers should never fail: %v", err) } for _, n := range e.Node.Nodes { - removed[mustParseMemberIDFromKey(n.Key)] = true + removed[MustParseMemberIDFromKey(n.Key)] = true } return members, removed } func clusterVersionFromStore(st store.Store) *semver.Version { - e, err := st.Get(path.Join(StoreClusterPrefix, "version"), false, false) + e, err := st.Get(path.Join(storePrefix, "version"), false, false) if err != nil { if isKeyNotFound(err) { return nil @@ -460,7 +464,7 @@ func clusterVersionFromStore(st store.Store) *semver.Version { // with the existing cluster. If the validation succeeds, it assigns the IDs // from the existing cluster to the local cluster. // If the validation fails, an error will be returned. -func ValidateClusterAndAssignIDs(local *cluster, existing *cluster) error { +func ValidateClusterAndAssignIDs(local *RaftCluster, existing *RaftCluster) error { ems := existing.Members() lms := local.Members() if len(ems) != len(lms) { @@ -481,3 +485,12 @@ func ValidateClusterAndAssignIDs(local *cluster, existing *cluster) error { } return nil } + +func mustDetectDowngrade(cv *semver.Version) { + lv := semver.Must(semver.NewVersion(version.Version)) + // only keep major.minor version for comparison against cluster version + lv = &semver.Version{Major: lv.Major, Minor: lv.Minor} + if cv != nil && lv.LessThan(*cv) { + plog.Fatalf("cluster cannot be downgraded (current version: %s is lower than determined cluster version: %s).", version.Version, version.Cluster(cv.String())) + } +} diff --git a/etcdserver/cluster_test.go b/etcdserver/membership/cluster_test.go similarity index 97% rename from etcdserver/cluster_test.go rename to etcdserver/membership/cluster_test.go index 968acddc4..14e38863b 100644 --- a/etcdserver/cluster_test.go +++ b/etcdserver/membership/cluster_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package etcdserver +package membership import ( "encoding/json" @@ -274,7 +274,7 @@ func TestClusterValidateAndAssignIDs(t *testing.T) { } func TestClusterValidateConfigurationChange(t *testing.T) { - cl := newCluster("") + cl := NewCluster("") cl.SetStore(store.New()) for i := 1; i <= 4; i++ { attr := RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", i)}} @@ -457,7 +457,7 @@ func TestClusterAddMember(t *testing.T) { { Name: "Create", Params: []interface{}{ - path.Join(storeMembersPrefix, "1", "raftAttributes"), + path.Join(StoreMembersPrefix, "1", "raftAttributes"), false, `{"peerURLs":null}`, false, @@ -471,7 +471,7 @@ func TestClusterAddMember(t *testing.T) { } func TestClusterMembers(t *testing.T) { - cls := &cluster{ + cls := &RaftCluster{ members: map[types.ID]*Member{ 1: {ID: 1}, 20: {ID: 20}, @@ -499,8 +499,8 @@ func TestClusterRemoveMember(t *testing.T) { c.RemoveMember(1) wactions := []testutil.Action{ - {Name: "Delete", Params: []interface{}{memberStoreKey(1), true, true}}, - {Name: "Create", Params: []interface{}{removedMemberStoreKey(1), false, "", false, store.TTLOptionSet{ExpireTime: store.Permanent}}}, + {Name: "Delete", Params: []interface{}{MemberStoreKey(1), true, true}}, + {Name: "Create", Params: []interface{}{RemovedMemberStoreKey(1), false, "", false, store.TTLOptionSet{ExpireTime: store.Permanent}}}, } if !reflect.DeepEqual(st.Action(), wactions) { t.Errorf("actions = %v, want %v", st.Action(), wactions) @@ -558,8 +558,8 @@ func TestNodeToMember(t *testing.T) { } } -func newTestCluster(membs []*Member) *cluster { - c := &cluster{members: make(map[types.ID]*Member), removed: make(map[types.ID]bool)} +func newTestCluster(membs []*Member) *RaftCluster { + c := &RaftCluster{members: make(map[types.ID]*Member), removed: make(map[types.ID]bool)} for _, m := range membs { c.members[m.ID] = m } @@ -642,7 +642,7 @@ func TestIsReadyToAddNewMember(t *testing.T) { } for i, tt := range tests { c := newTestCluster(tt.members) - if got := c.isReadyToAddNewMember(); got != tt.want { + if got := c.IsReadyToAddNewMember(); got != tt.want { t.Errorf("%d: isReadyToAddNewMember returned %t, want %t", i, got, tt.want) } } @@ -727,7 +727,7 @@ func TestIsReadyToRemoveMember(t *testing.T) { } for i, tt := range tests { c := newTestCluster(tt.members) - if got := c.isReadyToRemoveMember(tt.removeID); got != tt.want { + if got := c.IsReadyToRemoveMember(tt.removeID); got != tt.want { t.Errorf("%d: isReadyToAddNewMember returned %t, want %t", i, got, tt.want) } } diff --git a/etcdserver/membership/errors.go b/etcdserver/membership/errors.go new file mode 100644 index 000000000..a8605e3eb --- /dev/null +++ b/etcdserver/membership/errors.go @@ -0,0 +1,33 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package membership + +import ( + "errors" + + etcdErr "github.com/coreos/etcd/error" +) + +var ( + ErrIDRemoved = errors.New("membership: ID removed") + ErrIDExists = errors.New("membership: ID exists") + ErrIDNotFound = errors.New("membership: ID not found") + ErrPeerURLexists = errors.New("membership: peerURL exists") +) + +func isKeyNotFound(err error) bool { + e, ok := err.(*etcdErr.Error) + return ok && e.ErrorCode == etcdErr.EcodeKeyNotFound +} diff --git a/etcdserver/member.go b/etcdserver/membership/member.go similarity index 82% rename from etcdserver/member.go rename to etcdserver/membership/member.go index e56881a51..7fdb39986 100644 --- a/etcdserver/member.go +++ b/etcdserver/membership/member.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package etcdserver +package membership import ( "crypto/sha1" @@ -26,11 +26,24 @@ import ( "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/store" + "github.com/coreos/pkg/capnslog" ) var ( - storeMembersPrefix = path.Join(StoreClusterPrefix, "members") - storeRemovedMembersPrefix = path.Join(StoreClusterPrefix, "removed_members") + plog = capnslog.NewPackageLogger("github.com/coreos/etcd/etcdserver", "membership") + + StoreMembersPrefix = path.Join(storePrefix, "members") + storeRemovedMembersPrefix = path.Join(storePrefix, "removed_members") +) + +const ( + // TODO: make this private after moving all membership storage logic + // from etcdserver pkg + AttributesSuffix = "attributes" + raftAttributesSuffix = "raftAttributes" + + // the prefix for stroing membership related information in store provided by store pkg. + storePrefix = "/0" ) // RaftAttributes represents the raft related attributes of an etcd member. @@ -110,15 +123,15 @@ func (m *Member) IsStarted() bool { return len(m.Name) != 0 } -func memberStoreKey(id types.ID) string { - return path.Join(storeMembersPrefix, id.String()) +func MemberStoreKey(id types.ID) string { + return path.Join(StoreMembersPrefix, id.String()) } func MemberAttributesStorePath(id types.ID) string { - return path.Join(memberStoreKey(id), attributesSuffix) + return path.Join(MemberStoreKey(id), AttributesSuffix) } -func mustParseMemberIDFromKey(key string) types.ID { +func MustParseMemberIDFromKey(key string) types.ID { id, err := types.IDFromString(path.Base(key)) if err != nil { plog.Panicf("unexpected parse member id error: %v", err) @@ -126,17 +139,17 @@ func mustParseMemberIDFromKey(key string) types.ID { return id } -func removedMemberStoreKey(id types.ID) string { +func RemovedMemberStoreKey(id types.ID) string { return path.Join(storeRemovedMembersPrefix, id.String()) } -// nodeToMember builds member from a key value node. +// NodeToMember builds member from a key value node. // the child nodes of the given node MUST be sorted by key. func nodeToMember(n *store.NodeExtern) (*Member, error) { - m := &Member{ID: mustParseMemberIDFromKey(n.Key)} + m := &Member{ID: MustParseMemberIDFromKey(n.Key)} attrs := make(map[string][]byte) raftAttrKey := path.Join(n.Key, raftAttributesSuffix) - attrKey := path.Join(n.Key, attributesSuffix) + attrKey := path.Join(n.Key, AttributesSuffix) for _, nn := range n.Nodes { if nn.Key != raftAttrKey && nn.Key != attrKey { return nil, fmt.Errorf("unknown key %q", nn.Key) diff --git a/etcdserver/member_test.go b/etcdserver/membership/member_test.go similarity index 99% rename from etcdserver/member_test.go rename to etcdserver/membership/member_test.go index 2159282a2..16bc15390 100644 --- a/etcdserver/member_test.go +++ b/etcdserver/membership/member_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package etcdserver +package membership import ( "net/url" diff --git a/etcdserver/raft.go b/etcdserver/raft.go index 803d5cb55..a936afee7 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -24,6 +24,7 @@ import ( "time" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/etcdserver/membership" "github.com/coreos/etcd/pkg/contention" "github.com/coreos/etcd/pkg/pbutil" "github.com/coreos/etcd/pkg/types" @@ -279,7 +280,7 @@ func advanceTicksForElection(n raft.Node, electionTicks int) { } } -func startNode(cfg *ServerConfig, cl *cluster, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) { +func startNode(cfg *ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) { var err error member := cl.MemberByName(cfg.Name) metadata := pbutil.MustMarshal( @@ -323,7 +324,7 @@ func startNode(cfg *ServerConfig, cl *cluster, ids []types.ID) (id types.ID, n r return } -func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *cluster, raft.Node, *raft.MemoryStorage, *wal.WAL) { +func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) { var walsnap walpb.Snapshot if snapshot != nil { walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term @@ -331,7 +332,7 @@ func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *clust w, id, cid, st, ents := readWAL(cfg.WALDir(), walsnap) plog.Infof("restarting member %s in cluster %s at commit index %d", id, cid, st.Commit) - cl := newCluster("") + cl := membership.NewCluster("") cl.SetID(cid) s := raft.NewMemoryStorage() if snapshot != nil { @@ -357,7 +358,7 @@ func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *clust return id, cl, n, s, w } -func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *cluster, raft.Node, *raft.MemoryStorage, *wal.WAL) { +func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) { var walsnap walpb.Snapshot if snapshot != nil { walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term @@ -387,7 +388,7 @@ func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (type } plog.Printf("forcing restart of member %s in cluster %s at commit index %d", id, cid, st.Commit) - cl := newCluster("") + cl := membership.NewCluster("") cl.SetID(cid) s := raft.NewMemoryStorage() if snapshot != nil { @@ -473,9 +474,9 @@ func createConfigChangeEnts(ids []uint64, self uint64, term, index uint64) []raf next++ } if !found { - m := Member{ + m := membership.Member{ ID: types.ID(self), - RaftAttributes: RaftAttributes{PeerURLs: []string{"http://localhost:7001", "http://localhost:2380"}}, + RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"http://localhost:7001", "http://localhost:2380"}}, } ctx, err := json.Marshal(m) if err != nil { diff --git a/etcdserver/raft_test.go b/etcdserver/raft_test.go index e46131ffd..3bf3eb7d9 100644 --- a/etcdserver/raft_test.go +++ b/etcdserver/raft_test.go @@ -20,6 +20,7 @@ import ( "testing" "time" + "github.com/coreos/etcd/etcdserver/membership" "github.com/coreos/etcd/pkg/mock/mockstorage" "github.com/coreos/etcd/pkg/pbutil" "github.com/coreos/etcd/pkg/types" @@ -71,9 +72,9 @@ func TestGetIDs(t *testing.T) { } func TestCreateConfigChangeEnts(t *testing.T) { - m := Member{ + m := membership.Member{ ID: types.ID(1), - RaftAttributes: RaftAttributes{PeerURLs: []string{"http://localhost:7001", "http://localhost:2380"}}, + RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"http://localhost:7001", "http://localhost:2380"}}, } ctx, err := json.Marshal(m) if err != nil { diff --git a/etcdserver/server.go b/etcdserver/server.go index 1b2a0f7fb..2a0d9c998 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -33,6 +33,7 @@ import ( "github.com/coreos/etcd/discovery" "github.com/coreos/etcd/etcdserver/api/v2http/httptypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/etcdserver/membership" "github.com/coreos/etcd/etcdserver/stats" "github.com/coreos/etcd/lease" "github.com/coreos/etcd/pkg/fileutil" @@ -82,7 +83,7 @@ const ( var ( plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "etcdserver") - storeMemberAttributeRegexp = regexp.MustCompile(path.Join(storeMembersPrefix, "[[:xdigit:]]{1,16}", attributesSuffix)) + storeMemberAttributeRegexp = regexp.MustCompile(path.Join(membership.StoreMembersPrefix, "[[:xdigit:]]{1,16}", "attributes")) ) func init() { @@ -126,7 +127,7 @@ type Server interface { // AddMember attempts to add a member into the cluster. It will return // ErrIDRemoved if member ID is removed from the cluster, or return // ErrIDExists if member ID exists in the cluster. - AddMember(ctx context.Context, memb Member) error + AddMember(ctx context.Context, memb membership.Member) error // RemoveMember attempts to remove a member from the cluster. It will // return ErrIDRemoved if member ID is removed from the cluster, or return // ErrIDNotFound if member ID is not in the cluster. @@ -134,7 +135,7 @@ type Server interface { // UpdateMember attempts to update an existing member in the cluster. It will // return ErrIDNotFound if the member ID does not exist. - UpdateMember(ctx context.Context, updateMemb Member) error + UpdateMember(ctx context.Context, updateMemb membership.Member) error // ClusterVersion is the cluster-wide minimum major.minor version. // Cluster version is set to the min version that an etcd member is @@ -167,9 +168,9 @@ type EtcdServer struct { done chan struct{} errorc chan error id types.ID - attributes Attributes + attributes membership.Attributes - cluster *cluster + cluster *membership.RaftCluster store store.Store @@ -216,7 +217,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { n raft.Node s *raft.MemoryStorage id types.ID - cl *cluster + cl *membership.RaftCluster ) if terr := fileutil.TouchDirAll(cfg.DataDir); terr != nil { @@ -239,13 +240,13 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { if err != nil { return nil, err } - var remotes []*Member + var remotes []*membership.Member switch { case !haveWAL && !cfg.NewCluster: if err := cfg.VerifyJoinExisting(); err != nil { return nil, err } - cl, err = newClusterFromURLsMap(cfg.InitialClusterToken, cfg.InitialPeerURLsMap) + cl, err = membership.NewClusterFromURLsMap(cfg.InitialClusterToken, cfg.InitialPeerURLsMap) if err != nil { return nil, err } @@ -253,7 +254,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { if err != nil { return nil, fmt.Errorf("cannot fetch cluster info from peer urls: %v", err) } - if err := ValidateClusterAndAssignIDs(cl, existingCluster); err != nil { + if err := membership.ValidateClusterAndAssignIDs(cl, existingCluster); err != nil { return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err) } if !isCompatibleWithCluster(cl, cl.MemberByName(cfg.Name).ID, prt) { @@ -261,7 +262,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { } remotes = existingCluster.Members() - cl.SetID(existingCluster.id) + cl.SetID(existingCluster.ID()) cl.SetStore(st) cfg.Print() id, n, s, w = startNode(cfg, cl, nil) @@ -269,7 +270,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { if err := cfg.VerifyBootstrap(); err != nil { return nil, err } - cl, err = newClusterFromURLsMap(cfg.InitialClusterToken, cfg.InitialPeerURLsMap) + cl, err = membership.NewClusterFromURLsMap(cfg.InitialClusterToken, cfg.InitialPeerURLsMap) if err != nil { return nil, err } @@ -291,7 +292,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { if checkDuplicateURL(urlsmap) { return nil, fmt.Errorf("discovery cluster %s has duplicate url", urlsmap) } - if cl, err = newClusterFromURLsMap(cfg.InitialClusterToken, urlsmap); err != nil { + if cl, err = membership.NewClusterFromURLsMap(cfg.InitialClusterToken, urlsmap); err != nil { return nil, err } } @@ -357,7 +358,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { storage: NewStorage(w, ss), }, id: id, - attributes: Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, + attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, cluster: cl, stats: sstats, lstats: lstats, @@ -466,7 +467,7 @@ func (s *EtcdServer) purgeFile() { func (s *EtcdServer) ID() types.ID { return s.id } -func (s *EtcdServer) Cluster() *cluster { return s.cluster } +func (s *EtcdServer) Cluster() *membership.RaftCluster { return s.cluster } func (s *EtcdServer) RaftHandler() http.Handler { return s.r.transport.Handler() } @@ -792,8 +793,8 @@ func (s *EtcdServer) LeaderStats() []byte { func (s *EtcdServer) StoreStats() []byte { return s.store.JsonStats() } -func (s *EtcdServer) AddMember(ctx context.Context, memb Member) error { - if s.cfg.StrictReconfigCheck && !s.cluster.isReadyToAddNewMember() { +func (s *EtcdServer) AddMember(ctx context.Context, memb membership.Member) error { + if s.cfg.StrictReconfigCheck && !s.cluster.IsReadyToAddNewMember() { // If s.cfg.StrictReconfigCheck is false, it means the option --strict-reconfig-check isn't passed to etcd. // In such a case adding a new member is allowed unconditionally return ErrNotEnoughStartedMembers @@ -813,7 +814,7 @@ func (s *EtcdServer) AddMember(ctx context.Context, memb Member) error { } func (s *EtcdServer) RemoveMember(ctx context.Context, id uint64) error { - if s.cfg.StrictReconfigCheck && !s.cluster.isReadyToRemoveMember(id) { + if s.cfg.StrictReconfigCheck && !s.cluster.IsReadyToRemoveMember(id) { // If s.cfg.StrictReconfigCheck is false, it means the option --strict-reconfig-check isn't passed to etcd. // In such a case removing a member is allowed unconditionally return ErrNotEnoughStartedMembers @@ -826,7 +827,7 @@ func (s *EtcdServer) RemoveMember(ctx context.Context, id uint64) error { return s.configure(ctx, cc) } -func (s *EtcdServer) UpdateMember(ctx context.Context, memb Member) error { +func (s *EtcdServer) UpdateMember(ctx context.Context, memb membership.Member) error { b, err := json.Marshal(memb) if err != nil { return err @@ -914,7 +915,7 @@ func (s *EtcdServer) publish(timeout time.Duration) { } req := pb.Request{ Method: "PUT", - Path: MemberAttributesStorePath(s.id), + Path: membership.MemberAttributesStorePath(s.id), Val: string(b), } @@ -1093,8 +1094,8 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response { // TODO (yicheng): cluster should be the owner of cluster prefix store // we should not modify cluster store here. if storeMemberAttributeRegexp.MatchString(r.Path) { - id := mustParseMemberIDFromKey(path.Dir(r.Path)) - var attr Attributes + id := membership.MustParseMemberIDFromKey(path.Dir(r.Path)) + var attr membership.Attributes if err := json.Unmarshal([]byte(r.Val), &attr); err != nil { plog.Panicf("unmarshal %s should never fail: %v", r.Val, err) } @@ -1137,7 +1138,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con *confState = *s.r.ApplyConfChange(cc) switch cc.Type { case raftpb.ConfChangeAddNode: - m := new(Member) + m := new(membership.Member) if err := json.Unmarshal(cc.Context, m); err != nil { plog.Panicf("unmarshal member should never fail: %v", err) } @@ -1161,7 +1162,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con plog.Noticef("removed member %s from cluster %s", id, s.cluster.ID()) } case raftpb.ConfChangeUpdateNode: - m := new(Member) + m := new(membership.Member) if err := json.Unmarshal(cc.Context, m); err != nil { plog.Panicf("unmarshal member should never fail: %v", err) } diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index ffecedef4..83e86e420 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -26,6 +26,7 @@ import ( "time" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/etcdserver/membership" "github.com/coreos/etcd/lease" "github.com/coreos/etcd/pkg/idutil" "github.com/coreos/etcd/pkg/mock/mockstorage" @@ -166,7 +167,7 @@ func TestApplyRepeat(t *testing.T) { cl := newTestCluster(nil) st := store.New() cl.SetStore(store.New()) - cl.AddMember(&Member{ID: 1234}) + cl.AddMember(&membership.Member{ID: 1234}) s := &EtcdServer{ r: raftNode{ Node: n, @@ -457,7 +458,7 @@ func TestApplyRequest(t *testing.T) { } func TestApplyRequestOnAdminMemberAttributes(t *testing.T) { - cl := newTestCluster([]*Member{{ID: 1}}) + cl := newTestCluster([]*membership.Member{{ID: 1}}) srv := &EtcdServer{ store: mockstore.NewRecorder(), cluster: cl, @@ -465,21 +466,21 @@ func TestApplyRequestOnAdminMemberAttributes(t *testing.T) { req := pb.Request{ Method: "PUT", ID: 1, - Path: path.Join(storeMembersPrefix, strconv.FormatUint(1, 16), attributesSuffix), + Path: path.Join(membership.StoreMembersPrefix, strconv.FormatUint(1, 16), membership.AttributesSuffix), Val: `{"Name":"abc","ClientURLs":["http://127.0.0.1:2379"]}`, } srv.applyRequest(req) - w := Attributes{Name: "abc", ClientURLs: []string{"http://127.0.0.1:2379"}} + w := membership.Attributes{Name: "abc", ClientURLs: []string{"http://127.0.0.1:2379"}} if g := cl.Member(1).Attributes; !reflect.DeepEqual(g, w) { t.Errorf("attributes = %v, want %v", g, w) } } func TestApplyConfChangeError(t *testing.T) { - cl := newCluster("") + cl := membership.NewCluster("") cl.SetStore(store.New()) for i := 1; i <= 4; i++ { - cl.AddMember(&Member{ID: types.ID(i)}) + cl.AddMember(&membership.Member{ID: types.ID(i)}) } cl.RemoveMember(4) @@ -492,28 +493,28 @@ func TestApplyConfChangeError(t *testing.T) { Type: raftpb.ConfChangeAddNode, NodeID: 4, }, - ErrIDRemoved, + membership.ErrIDRemoved, }, { raftpb.ConfChange{ Type: raftpb.ConfChangeUpdateNode, NodeID: 4, }, - ErrIDRemoved, + membership.ErrIDRemoved, }, { raftpb.ConfChange{ Type: raftpb.ConfChangeAddNode, NodeID: 1, }, - ErrIDExists, + membership.ErrIDExists, }, { raftpb.ConfChange{ Type: raftpb.ConfChangeRemoveNode, NodeID: 5, }, - ErrIDNotFound, + membership.ErrIDNotFound, }, } for i, tt := range tests { @@ -541,10 +542,10 @@ func TestApplyConfChangeError(t *testing.T) { } func TestApplyConfChangeShouldStop(t *testing.T) { - cl := newCluster("") + cl := membership.NewCluster("") cl.SetStore(store.New()) for i := 1; i <= 3; i++ { - cl.AddMember(&Member{ID: types.ID(i)}) + cl.AddMember(&membership.Member{ID: types.ID(i)}) } srv := &EtcdServer{ id: 1, @@ -581,10 +582,10 @@ func TestApplyConfChangeShouldStop(t *testing.T) { // TestApplyMultiConfChangeShouldStop ensures that apply will return shouldStop // if the local member is removed along with other conf updates. func TestApplyMultiConfChangeShouldStop(t *testing.T) { - cl := newCluster("") + cl := membership.NewCluster("") cl.SetStore(store.New()) for i := 1; i <= 5; i++ { - cl.AddMember(&Member{ID: types.ID(i)}) + cl.AddMember(&membership.Member{ID: types.ID(i)}) } srv := &EtcdServer{ id: 2, @@ -922,8 +923,9 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) { maxInFlightMsgSnap = 16 ) n := newNopReadyNode() - cl := newCluster("abc") - cl.SetStore(store.New()) + st := store.New() + cl := membership.NewCluster("abc") + cl.SetStore(st) testdir, err := ioutil.TempDir(os.TempDir(), "testsnapdir") if err != nil { @@ -946,7 +948,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) { storage: mockstorage.NewStorageRecorder(testdir), raftStorage: rs, }, - store: cl.store, + store: st, cluster: cl, msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap), } @@ -1032,7 +1034,7 @@ func TestAddMember(t *testing.T) { reqIDGen: idutil.NewGenerator(0, time.Time{}), } s.start() - m := Member{ID: 1234, RaftAttributes: RaftAttributes{PeerURLs: []string{"foo"}}} + m := membership.Member{ID: 1234, RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"foo"}}} err := s.AddMember(context.TODO(), m) gaction := n.Action() s.Stop() @@ -1058,7 +1060,7 @@ func TestRemoveMember(t *testing.T) { cl := newTestCluster(nil) st := store.New() cl.SetStore(store.New()) - cl.AddMember(&Member{ID: 1234}) + cl.AddMember(&membership.Member{ID: 1234}) s := &EtcdServer{ r: raftNode{ Node: n, @@ -1097,7 +1099,7 @@ func TestUpdateMember(t *testing.T) { cl := newTestCluster(nil) st := store.New() cl.SetStore(st) - cl.AddMember(&Member{ID: 1234}) + cl.AddMember(&membership.Member{ID: 1234}) s := &EtcdServer{ r: raftNode{ Node: n, @@ -1110,7 +1112,7 @@ func TestUpdateMember(t *testing.T) { reqIDGen: idutil.NewGenerator(0, time.Time{}), } s.start() - wm := Member{ID: 1234, RaftAttributes: RaftAttributes{PeerURLs: []string{"http://127.0.0.1:1"}}} + wm := membership.Member{ID: 1234, RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"http://127.0.0.1:1"}}} err := s.UpdateMember(context.TODO(), wm) gaction := n.Action() s.Stop() @@ -1139,8 +1141,8 @@ func TestPublish(t *testing.T) { cfg: &ServerConfig{TickMs: 1}, id: 1, r: raftNode{Node: n}, - attributes: Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}}, - cluster: &cluster{}, + attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}}, + cluster: &membership.RaftCluster{}, w: w, reqIDGen: idutil.NewGenerator(0, time.Time{}), } @@ -1161,11 +1163,11 @@ func TestPublish(t *testing.T) { if r.Method != "PUT" { t.Errorf("method = %s, want PUT", r.Method) } - wm := Member{ID: 1, Attributes: Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}}} - if wpath := path.Join(memberStoreKey(wm.ID), attributesSuffix); r.Path != wpath { + wm := membership.Member{ID: 1, Attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}}} + if wpath := membership.MemberAttributesStorePath(wm.ID); r.Path != wpath { t.Errorf("path = %s, want %s", r.Path, wpath) } - var gattr Attributes + var gattr membership.Attributes if err := json.Unmarshal([]byte(r.Val), &gattr); err != nil { t.Fatalf("unmarshal val error: %v", err) } @@ -1182,7 +1184,7 @@ func TestPublishStopped(t *testing.T) { Node: newNodeNop(), transport: rafthttp.NewNopTransporter(), }, - cluster: &cluster{}, + cluster: &membership.RaftCluster{}, w: mockwait.NewNop(), done: make(chan struct{}), stop: make(chan struct{}), @@ -1223,8 +1225,8 @@ func TestUpdateVersion(t *testing.T) { id: 1, cfg: &ServerConfig{TickMs: 1}, r: raftNode{Node: n}, - attributes: Attributes{Name: "node1", ClientURLs: []string{"http://node1.com"}}, - cluster: &cluster{}, + attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://node1.com"}}, + cluster: &membership.RaftCluster{}, w: w, reqIDGen: idutil.NewGenerator(0, time.Time{}), } @@ -1279,39 +1281,36 @@ func TestStopNotify(t *testing.T) { func TestGetOtherPeerURLs(t *testing.T) { tests := []struct { - membs []*Member - self string + membs []*membership.Member wurls []string }{ { - []*Member{ - newTestMember(1, []string{"http://10.0.0.1"}, "a", nil), + []*membership.Member{ + membership.NewMember("1", types.MustNewURLs([]string{"http://10.0.0.1:1"}), "a", nil), }, - "a", []string{}, }, { - []*Member{ - newTestMember(1, []string{"http://10.0.0.1"}, "a", nil), - newTestMember(2, []string{"http://10.0.0.2"}, "b", nil), - newTestMember(3, []string{"http://10.0.0.3"}, "c", nil), + []*membership.Member{ + membership.NewMember("1", types.MustNewURLs([]string{"http://10.0.0.1:1"}), "a", nil), + membership.NewMember("2", types.MustNewURLs([]string{"http://10.0.0.2:2"}), "a", nil), + membership.NewMember("3", types.MustNewURLs([]string{"http://10.0.0.3:3"}), "a", nil), }, - "a", - []string{"http://10.0.0.2", "http://10.0.0.3"}, + []string{"http://10.0.0.2:2", "http://10.0.0.3:3"}, }, { - []*Member{ - newTestMember(1, []string{"http://10.0.0.1"}, "a", nil), - newTestMember(3, []string{"http://10.0.0.3"}, "c", nil), - newTestMember(2, []string{"http://10.0.0.2"}, "b", nil), + []*membership.Member{ + membership.NewMember("1", types.MustNewURLs([]string{"http://10.0.0.1:1"}), "a", nil), + membership.NewMember("3", types.MustNewURLs([]string{"http://10.0.0.3:3"}), "a", nil), + membership.NewMember("2", types.MustNewURLs([]string{"http://10.0.0.2:2"}), "a", nil), }, - "a", - []string{"http://10.0.0.2", "http://10.0.0.3"}, + []string{"http://10.0.0.2:2", "http://10.0.0.3:3"}, }, } for i, tt := range tests { - cl := newClusterFromMembers("", types.ID(0), tt.membs) - urls := getRemotePeerURLs(cl, tt.self) + cl := membership.NewClusterFromMembers("", types.ID(0), tt.membs) + self := "1" + urls := getRemotePeerURLs(cl, self) if !reflect.DeepEqual(urls, tt.wurls) { t.Errorf("#%d: urls = %+v, want %+v", i, urls, tt.wurls) } @@ -1440,3 +1439,11 @@ func (n *nodeCommitter) Propose(ctx context.Context, data []byte) error { } return nil } + +func newTestCluster(membs []*membership.Member) *membership.RaftCluster { + c := membership.NewCluster("") + for _, m := range membs { + c.AddMember(m) + } + return c +} diff --git a/etcdserver/util.go b/etcdserver/util.go index e02a1d0a1..0f4c3772b 100644 --- a/etcdserver/util.go +++ b/etcdserver/util.go @@ -17,13 +17,14 @@ package etcdserver import ( "time" + "github.com/coreos/etcd/etcdserver/membership" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/rafthttp" ) // isConnectedToQuorumSince checks whether the local member is connected to the // quorum of the cluster since the given time. -func isConnectedToQuorumSince(transport rafthttp.Transporter, since time.Time, self types.ID, members []*Member) bool { +func isConnectedToQuorumSince(transport rafthttp.Transporter, since time.Time, self types.ID, members []*membership.Member) bool { var connectedNum int for _, m := range members { if m.ID == self || isConnectedSince(transport, since, m.ID) { diff --git a/pkg/types/urls.go b/pkg/types/urls.go index ce2483ffa..8d5ff6248 100644 --- a/pkg/types/urls.go +++ b/pkg/types/urls.go @@ -53,6 +53,14 @@ func NewURLs(strs []string) (URLs, error) { return us, nil } +func MustNewURLs(strs []string) URLs { + urls, err := NewURLs(strs) + if err != nil { + panic(err) + } + return urls +} + func (us URLs) String() string { return strings.Join(us.StringSlice(), ",") }