From a6a649f1c319c53511d68d450429fee5bed96dd3 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Tue, 12 May 2015 17:22:06 -0700 Subject: [PATCH] etcdserver: stop exposing Cluster struct After this PR, only cluster's interface Cluster is exposed, which makes code much cleaner. And it avoids external packages to rely on cluster struct in the future. --- etcdmain/etcd.go | 2 +- etcdserver/cluster.go | 56 +++++++++--------- etcdserver/cluster_test.go | 6 +- etcdserver/cluster_util.go | 12 ++-- etcdserver/etcdhttp/client.go | 54 ++++++++--------- etcdserver/etcdhttp/client_security.go | 14 ++--- etcdserver/etcdhttp/client_test.go | 82 +++++++++++++------------- etcdserver/etcdhttp/peer.go | 10 ++-- etcdserver/etcdhttp/peer_test.go | 4 +- etcdserver/raft.go | 6 +- etcdserver/server.go | 62 +++++++++---------- etcdserver/server_test.go | 24 ++++---- integration/cluster_test.go | 2 +- 13 files changed, 168 insertions(+), 166 deletions(-) diff --git a/etcdmain/etcd.go b/etcdmain/etcd.go index 68ceb6af6..9925d0839 100644 --- a/etcdmain/etcd.go +++ b/etcdmain/etcd.go @@ -203,7 +203,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) { Handler: etcdhttp.NewClientHandler(s), Info: cfg.corsInfo, } - ph := etcdhttp.NewPeerHandler(s.Cluster, s.RaftHandler()) + ph := etcdhttp.NewPeerHandler(s.Cluster(), s.RaftHandler()) // Start the peer server in a goroutine for _, l := range plns { go func(l net.Listener) { diff --git a/etcdserver/cluster.go b/etcdserver/cluster.go index 90868cc1d..3779e84a3 100644 --- a/etcdserver/cluster.go +++ b/etcdserver/cluster.go @@ -39,7 +39,7 @@ const ( attributesSuffix = "attributes" ) -type ClusterInfo interface { +type Cluster interface { // ID returns the cluster ID ID() types.ID // ClientURLs returns an aggregate set of all URLs on which this @@ -56,7 +56,7 @@ type ClusterInfo interface { } // Cluster is a list of Members that belong to the same raft cluster -type Cluster struct { +type cluster struct { id types.ID token string store store.Store @@ -69,9 +69,9 @@ type Cluster struct { removed map[types.ID]bool } -func NewCluster(token string, initial types.URLsMap) (*Cluster, error) { +func newClusterFromURLsMap(token string, urlsmap types.URLsMap) (*cluster, error) { c := newCluster(token) - for name, urls := range initial { + for name, urls := range urlsmap { m := NewMember(name, urls, token, nil) if _, ok := c.members[m.ID]; ok { return nil, fmt.Errorf("member exists with identical ID %v", m) @@ -85,7 +85,7 @@ func NewCluster(token string, initial types.URLsMap) (*Cluster, error) { return c, nil } -func NewClusterFromMembers(token string, id types.ID, membs []*Member) *Cluster { +func newClusterFromMembers(token string, id types.ID, membs []*Member) *cluster { c := newCluster(token) c.id = id for _, m := range membs { @@ -94,17 +94,17 @@ func NewClusterFromMembers(token string, id types.ID, membs []*Member) *Cluster return c } -func newCluster(token string) *Cluster { - return &Cluster{ +func newCluster(token string) *cluster { + return &cluster{ token: token, members: make(map[types.ID]*Member), removed: make(map[types.ID]bool), } } -func (c *Cluster) ID() types.ID { return c.id } +func (c *cluster) ID() types.ID { return c.id } -func (c *Cluster) Members() []*Member { +func (c *cluster) Members() []*Member { c.Lock() defer c.Unlock() var sms SortableMemberSlice @@ -115,7 +115,7 @@ func (c *Cluster) Members() []*Member { return []*Member(sms) } -func (c *Cluster) Member(id types.ID) *Member { +func (c *cluster) Member(id types.ID) *Member { c.Lock() defer c.Unlock() return c.members[id].Clone() @@ -123,7 +123,7 @@ func (c *Cluster) Member(id types.ID) *Member { // MemberByName returns a Member with the given name if exists. // If more than one member has the given name, it will panic. -func (c *Cluster) MemberByName(name string) *Member { +func (c *cluster) MemberByName(name string) *Member { c.Lock() defer c.Unlock() var memb *Member @@ -138,7 +138,7 @@ func (c *Cluster) MemberByName(name string) *Member { return memb.Clone() } -func (c *Cluster) MemberIDs() []types.ID { +func (c *cluster) MemberIDs() []types.ID { c.Lock() defer c.Unlock() var ids []types.ID @@ -149,7 +149,7 @@ func (c *Cluster) MemberIDs() []types.ID { return ids } -func (c *Cluster) IsIDRemoved(id types.ID) bool { +func (c *cluster) IsIDRemoved(id types.ID) bool { c.Lock() defer c.Unlock() return c.removed[id] @@ -157,7 +157,7 @@ func (c *Cluster) IsIDRemoved(id types.ID) bool { // PeerURLs returns a list of all peer addresses. // The returned list is sorted in ascending lexicographical order. -func (c *Cluster) PeerURLs() []string { +func (c *cluster) PeerURLs() []string { c.Lock() defer c.Unlock() urls := make([]string, 0) @@ -172,7 +172,7 @@ func (c *Cluster) PeerURLs() []string { // ClientURLs returns a list of all client addresses. // The returned list is sorted in ascending lexicographical order. -func (c *Cluster) ClientURLs() []string { +func (c *cluster) ClientURLs() []string { c.Lock() defer c.Unlock() urls := make([]string, 0) @@ -185,7 +185,7 @@ func (c *Cluster) ClientURLs() []string { return urls } -func (c *Cluster) String() string { +func (c *cluster) String() string { c.Lock() defer c.Unlock() b := &bytes.Buffer{} @@ -203,7 +203,7 @@ func (c *Cluster) String() string { return b.String() } -func (c *Cluster) genID() { +func (c *cluster) genID() { mIDs := c.MemberIDs() b := make([]byte, 8*len(mIDs)) for i, id := range mIDs { @@ -213,18 +213,18 @@ func (c *Cluster) genID() { c.id = types.ID(binary.BigEndian.Uint64(hash[:8])) } -func (c *Cluster) SetID(id types.ID) { c.id = id } +func (c *cluster) SetID(id types.ID) { c.id = id } -func (c *Cluster) SetStore(st store.Store) { c.store = st } +func (c *cluster) SetStore(st store.Store) { c.store = st } -func (c *Cluster) Recover() { +func (c *cluster) Recover() { c.members, c.removed = membersFromStore(c.store) c.version = clusterVersionFromStore(c.store) } // ValidateConfigurationChange takes a proposed ConfChange and // ensures that it is still valid. -func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error { +func (c *cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error { members, removed := membersFromStore(c.store) id := types.ID(cc.NodeID) if removed[id] { @@ -285,7 +285,7 @@ func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error { // AddMember adds a new Member into the cluster, and saves the given member's // raftAttributes into the store. The given member should have empty attributes. // A Member with a matching id must not exist. -func (c *Cluster) AddMember(m *Member) { +func (c *cluster) AddMember(m *Member) { c.Lock() defer c.Unlock() b, err := json.Marshal(m.RaftAttributes) @@ -301,7 +301,7 @@ func (c *Cluster) AddMember(m *Member) { // RemoveMember removes a member from the store. // The given id MUST exist, or the function panics. -func (c *Cluster) RemoveMember(id types.ID) { +func (c *cluster) RemoveMember(id types.ID) { c.Lock() defer c.Unlock() if _, err := c.store.Delete(memberStoreKey(id), true, true); err != nil { @@ -314,14 +314,14 @@ func (c *Cluster) RemoveMember(id types.ID) { c.removed[id] = true } -func (c *Cluster) UpdateAttributes(id types.ID, attr Attributes) { +func (c *cluster) UpdateAttributes(id types.ID, attr Attributes) { c.Lock() defer c.Unlock() c.members[id].Attributes = attr // TODO: update store in this function } -func (c *Cluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) { +func (c *cluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) { c.Lock() defer c.Unlock() b, err := json.Marshal(raftAttr) @@ -335,7 +335,7 @@ func (c *Cluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) { c.members[id].RaftAttributes = raftAttr } -func (c *Cluster) Version() *semver.Version { +func (c *cluster) Version() *semver.Version { c.Lock() defer c.Unlock() if c.version == nil { @@ -344,7 +344,7 @@ func (c *Cluster) Version() *semver.Version { return semver.Must(semver.NewVersion(c.version.String())) } -func (c *Cluster) SetVersion(ver *semver.Version) { +func (c *cluster) SetVersion(ver *semver.Version) { c.Lock() defer c.Unlock() if c.version != nil { @@ -401,7 +401,7 @@ func clusterVersionFromStore(st store.Store) *semver.Version { // with the existing cluster. If the validation succeeds, it assigns the IDs // from the existing cluster to the local cluster. // If the validation fails, an error will be returned. -func ValidateClusterAndAssignIDs(local *Cluster, existing *Cluster) error { +func ValidateClusterAndAssignIDs(local *cluster, existing *cluster) error { ems := existing.Members() lms := local.Members() if len(ems) != len(lms) { diff --git a/etcdserver/cluster_test.go b/etcdserver/cluster_test.go index a18f06a78..796e54bbd 100644 --- a/etcdserver/cluster_test.go +++ b/etcdserver/cluster_test.go @@ -470,7 +470,7 @@ func TestClusterAddMember(t *testing.T) { } func TestClusterMembers(t *testing.T) { - cls := &Cluster{ + cls := &cluster{ members: map[types.ID]*Member{ 1: &Member{ID: 1}, 20: &Member{ID: 20}, @@ -521,8 +521,8 @@ func TestNodeToMember(t *testing.T) { } } -func newTestCluster(membs []*Member) *Cluster { - c := &Cluster{members: make(map[types.ID]*Member), removed: make(map[types.ID]bool)} +func newTestCluster(membs []*Member) *cluster { + c := &cluster{members: make(map[types.ID]*Member), removed: make(map[types.ID]bool)} for _, m := range membs { c.members[m.ID] = m } diff --git a/etcdserver/cluster_util.go b/etcdserver/cluster_util.go index 203bbd0f5..8b6889550 100644 --- a/etcdserver/cluster_util.go +++ b/etcdserver/cluster_util.go @@ -30,7 +30,7 @@ import ( // isMemberBootstrapped tries to check if the given member has been bootstrapped // in the given cluster. -func isMemberBootstrapped(cl *Cluster, member string, tr *http.Transport) bool { +func isMemberBootstrapped(cl *cluster, member string, tr *http.Transport) bool { rcl, err := getClusterFromRemotePeers(getRemotePeerURLs(cl, member), false, tr) if err != nil { return false @@ -51,12 +51,12 @@ func isMemberBootstrapped(cl *Cluster, member string, tr *http.Transport) bool { // these URLs. The first URL to provide a response is used. If no URLs provide // a response, or a Cluster cannot be successfully created from a received // response, an error is returned. -func GetClusterFromRemotePeers(urls []string, tr *http.Transport) (*Cluster, error) { +func GetClusterFromRemotePeers(urls []string, tr *http.Transport) (*cluster, error) { return getClusterFromRemotePeers(urls, true, tr) } // If logerr is true, it prints out more error messages. -func getClusterFromRemotePeers(urls []string, logerr bool, tr *http.Transport) (*Cluster, error) { +func getClusterFromRemotePeers(urls []string, logerr bool, tr *http.Transport) (*cluster, error) { cc := &http.Client{ Transport: tr, Timeout: time.Second, @@ -90,14 +90,14 @@ func getClusterFromRemotePeers(urls []string, logerr bool, tr *http.Transport) ( } continue } - return NewClusterFromMembers("", id, membs), nil + return newClusterFromMembers("", id, membs), nil } return nil, fmt.Errorf("etcdserver: could not retrieve cluster information from the given urls") } // getRemotePeerURLs returns peer urls of remote members in the cluster. The // returned list is sorted in ascending lexicographical order. -func getRemotePeerURLs(cl ClusterInfo, local string) []string { +func getRemotePeerURLs(cl Cluster, local string) []string { us := make([]string, 0) for _, m := range cl.Members() { if m.Name == local { @@ -113,7 +113,7 @@ func getRemotePeerURLs(cl ClusterInfo, local string) []string { // The key of the returned map is the member's ID. The value of the returned map // is the semver version string. If it fails to get the version of a member, the key // will be an empty string. -func getVersions(cl ClusterInfo, tr *http.Transport) map[string]string { +func getVersions(cl Cluster, tr *http.Transport) map[string]string { members := cl.Members() vers := make(map[string]string) for _, m := range members { diff --git a/etcdserver/etcdhttp/client.go b/etcdserver/etcdhttp/client.go index 71570fbc7..871af6b3c 100644 --- a/etcdserver/etcdhttp/client.go +++ b/etcdserver/etcdhttp/client.go @@ -60,11 +60,11 @@ func NewClientHandler(server *etcdserver.EtcdServer) http.Handler { sec := security.NewStore(server, defaultServerTimeout) kh := &keysHandler{ - sec: sec, - server: server, - clusterInfo: server.Cluster, - timer: server, - timeout: defaultServerTimeout, + sec: sec, + server: server, + cluster: server.Cluster(), + timer: server, + timeout: defaultServerTimeout, } sh := &statsHandler{ @@ -72,19 +72,19 @@ func NewClientHandler(server *etcdserver.EtcdServer) http.Handler { } mh := &membersHandler{ - sec: sec, - server: server, - clusterInfo: server.Cluster, - clock: clockwork.NewRealClock(), + sec: sec, + server: server, + cluster: server.Cluster(), + clock: clockwork.NewRealClock(), } dmh := &deprecatedMachinesHandler{ - clusterInfo: server.Cluster, + cluster: server.Cluster(), } sech := &securityHandler{ - sec: sec, - clusterInfo: server.Cluster, + sec: sec, + cluster: server.Cluster(), } mux := http.NewServeMux() @@ -106,11 +106,11 @@ func NewClientHandler(server *etcdserver.EtcdServer) http.Handler { } type keysHandler struct { - sec *security.Store - server etcdserver.Server - clusterInfo etcdserver.ClusterInfo - timer etcdserver.RaftTimer - timeout time.Duration + sec *security.Store + server etcdserver.Server + cluster etcdserver.Cluster + timer etcdserver.RaftTimer + timeout time.Duration } func (h *keysHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -118,7 +118,7 @@ func (h *keysHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - w.Header().Set("X-Etcd-Cluster-ID", h.clusterInfo.ID().String()) + w.Header().Set("X-Etcd-Cluster-ID", h.cluster.ID().String()) ctx, cancel := context.WithTimeout(context.Background(), h.timeout) defer cancel() @@ -156,22 +156,22 @@ func (h *keysHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } type deprecatedMachinesHandler struct { - clusterInfo etcdserver.ClusterInfo + cluster etcdserver.Cluster } func (h *deprecatedMachinesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if !allowMethod(w, r.Method, "GET", "HEAD") { return } - endpoints := h.clusterInfo.ClientURLs() + endpoints := h.cluster.ClientURLs() w.Write([]byte(strings.Join(endpoints, ", "))) } type membersHandler struct { - sec *security.Store - server etcdserver.Server - clusterInfo etcdserver.ClusterInfo - clock clockwork.Clock + sec *security.Store + server etcdserver.Server + cluster etcdserver.Cluster + clock clockwork.Clock } func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -182,7 +182,7 @@ func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { writeNoAuth(w) return } - w.Header().Set("X-Etcd-Cluster-ID", h.clusterInfo.ID().String()) + w.Header().Set("X-Etcd-Cluster-ID", h.cluster.ID().String()) ctx, cancel := context.WithTimeout(context.Background(), defaultServerTimeout) defer cancel() @@ -191,7 +191,7 @@ func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { case "GET": switch trimPrefix(r.URL.Path, membersPrefix) { case "": - mc := newMemberCollection(h.clusterInfo.Members()) + mc := newMemberCollection(h.cluster.Members()) w.Header().Set("Content-Type", "application/json") if err := json.NewEncoder(w).Encode(mc); err != nil { log.Printf("etcdhttp: %v", err) @@ -202,7 +202,7 @@ func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { writeError(w, httptypes.NewHTTPError(http.StatusServiceUnavailable, "During election")) return } - m := newMember(h.clusterInfo.Member(id)) + m := newMember(h.cluster.Member(id)) w.Header().Set("Content-Type", "application/json") if err := json.NewEncoder(w).Encode(m); err != nil { log.Printf("etcdhttp: %v", err) diff --git a/etcdserver/etcdhttp/client_security.go b/etcdserver/etcdhttp/client_security.go index f1e3aaf6e..ec6f0bc0d 100644 --- a/etcdserver/etcdhttp/client_security.go +++ b/etcdserver/etcdhttp/client_security.go @@ -28,8 +28,8 @@ import ( ) type securityHandler struct { - sec *security.Store - clusterInfo etcdserver.ClusterInfo + sec *security.Store + cluster etcdserver.Cluster } func hasWriteRootAccess(sec *security.Store, r *http.Request) bool { @@ -140,7 +140,7 @@ func (sh *securityHandler) baseRoles(w http.ResponseWriter, r *http.Request) { writeNoAuth(w) return } - w.Header().Set("X-Etcd-Cluster-ID", sh.clusterInfo.ID().String()) + w.Header().Set("X-Etcd-Cluster-ID", sh.cluster.ID().String()) w.Header().Set("Content-Type", "application/json") var rolesCollections struct { Roles []string `json:"roles"` @@ -185,7 +185,7 @@ func (sh *securityHandler) forRole(w http.ResponseWriter, r *http.Request, role writeNoAuth(w) return } - w.Header().Set("X-Etcd-Cluster-ID", sh.clusterInfo.ID().String()) + w.Header().Set("X-Etcd-Cluster-ID", sh.cluster.ID().String()) switch r.Method { case "GET": @@ -245,7 +245,7 @@ func (sh *securityHandler) baseUsers(w http.ResponseWriter, r *http.Request) { writeNoAuth(w) return } - w.Header().Set("X-Etcd-Cluster-ID", sh.clusterInfo.ID().String()) + w.Header().Set("X-Etcd-Cluster-ID", sh.cluster.ID().String()) w.Header().Set("Content-Type", "application/json") var usersCollections struct { Users []string `json:"users"` @@ -290,7 +290,7 @@ func (sh *securityHandler) forUser(w http.ResponseWriter, r *http.Request, user writeNoAuth(w) return } - w.Header().Set("X-Etcd-Cluster-ID", sh.clusterInfo.ID().String()) + w.Header().Set("X-Etcd-Cluster-ID", sh.cluster.ID().String()) switch r.Method { case "GET": @@ -360,7 +360,7 @@ func (sh *securityHandler) enableDisable(w http.ResponseWriter, r *http.Request) writeNoAuth(w) return } - w.Header().Set("X-Etcd-Cluster-ID", sh.clusterInfo.ID().String()) + w.Header().Set("X-Etcd-Cluster-ID", sh.cluster.ID().String()) w.Header().Set("Content-Type", "application/json") isEnabled := sh.sec.SecurityEnabled() switch r.Method { diff --git a/etcdserver/etcdhttp/client_test.go b/etcdserver/etcdhttp/client_test.go index 2af1a83ef..c5779a117 100644 --- a/etcdserver/etcdhttp/client_test.go +++ b/etcdserver/etcdhttp/client_test.go @@ -564,9 +564,9 @@ func TestServeMembers(t *testing.T) { members: map[uint64]*etcdserver.Member{1: &memb1, 2: &memb2}, } h := &membersHandler{ - server: &serverRecorder{}, - clock: clockwork.NewFakeClock(), - clusterInfo: cluster, + server: &serverRecorder{}, + clock: clockwork.NewFakeClock(), + cluster: cluster, } wmc := string(`{"members":[{"id":"c","name":"","peerURLs":[],"clientURLs":["http://localhost:8080"]},{"id":"d","name":"","peerURLs":[],"clientURLs":["http://localhost:8081"]}]}`) @@ -617,9 +617,9 @@ func TestServeLeader(t *testing.T) { members: map[uint64]*etcdserver.Member{1: &memb1, 2: &memb2}, } h := &membersHandler{ - server: &serverRecorder{}, - clock: clockwork.NewFakeClock(), - clusterInfo: cluster, + server: &serverRecorder{}, + clock: clockwork.NewFakeClock(), + cluster: cluster, } wmc := string(`{"id":"1","name":"","peerURLs":[],"clientURLs":["http://localhost:8080"]}`) @@ -669,9 +669,9 @@ func TestServeMembersCreate(t *testing.T) { req.Header.Set("Content-Type", "application/json") s := &serverRecorder{} h := &membersHandler{ - server: s, - clock: clockwork.NewFakeClock(), - clusterInfo: &fakeCluster{id: 1}, + server: s, + clock: clockwork.NewFakeClock(), + cluster: &fakeCluster{id: 1}, } rw := httptest.NewRecorder() @@ -687,7 +687,7 @@ func TestServeMembersCreate(t *testing.T) { t.Errorf("content-type = %s, want %s", gct, wct) } gcid := rw.Header().Get("X-Etcd-Cluster-ID") - wcid := h.clusterInfo.ID().String() + wcid := h.cluster.ID().String() if gcid != wcid { t.Errorf("cid = %s, want %s", gcid, wcid) } @@ -718,8 +718,8 @@ func TestServeMembersDelete(t *testing.T) { } s := &serverRecorder{} h := &membersHandler{ - server: s, - clusterInfo: &fakeCluster{id: 1}, + server: s, + cluster: &fakeCluster{id: 1}, } rw := httptest.NewRecorder() @@ -730,7 +730,7 @@ func TestServeMembersDelete(t *testing.T) { t.Errorf("code=%d, want %d", rw.Code, wcode) } gcid := rw.Header().Get("X-Etcd-Cluster-ID") - wcid := h.clusterInfo.ID().String() + wcid := h.cluster.ID().String() if gcid != wcid { t.Errorf("cid = %s, want %s", gcid, wcid) } @@ -754,9 +754,9 @@ func TestServeMembersUpdate(t *testing.T) { req.Header.Set("Content-Type", "application/json") s := &serverRecorder{} h := &membersHandler{ - server: s, - clock: clockwork.NewFakeClock(), - clusterInfo: &fakeCluster{id: 1}, + server: s, + clock: clockwork.NewFakeClock(), + cluster: &fakeCluster{id: 1}, } rw := httptest.NewRecorder() @@ -768,7 +768,7 @@ func TestServeMembersUpdate(t *testing.T) { } gcid := rw.Header().Get("X-Etcd-Cluster-ID") - wcid := h.clusterInfo.ID().String() + wcid := h.cluster.ID().String() if gcid != wcid { t.Errorf("cid = %s, want %s", gcid, wcid) } @@ -1046,9 +1046,9 @@ func TestServeMembersFail(t *testing.T) { } for i, tt := range tests { h := &membersHandler{ - server: tt.server, - clusterInfo: &fakeCluster{id: 1}, - clock: clockwork.NewFakeClock(), + server: tt.server, + cluster: &fakeCluster{id: 1}, + clock: clockwork.NewFakeClock(), } rw := httptest.NewRecorder() h.ServeHTTP(rw, tt.req) @@ -1057,7 +1057,7 @@ func TestServeMembersFail(t *testing.T) { } if rw.Code != http.StatusMethodNotAllowed { gcid := rw.Header().Get("X-Etcd-Cluster-ID") - wcid := h.clusterInfo.ID().String() + wcid := h.cluster.ID().String() if gcid != wcid { t.Errorf("#%d: cid = %s, want %s", i, gcid, wcid) } @@ -1141,7 +1141,7 @@ func TestV2DeprecatedMachinesEndpoint(t *testing.T) { {"POST", http.StatusMethodNotAllowed}, } - m := &deprecatedMachinesHandler{clusterInfo: &etcdserver.Cluster{}} + m := &deprecatedMachinesHandler{cluster: &fakeCluster{}} s := httptest.NewServer(m) defer s.Close() @@ -1170,7 +1170,7 @@ func TestServeMachines(t *testing.T) { if err != nil { t.Fatal(err) } - h := &deprecatedMachinesHandler{clusterInfo: cluster} + h := &deprecatedMachinesHandler{cluster: cluster} h.ServeHTTP(writer, req) w := "http://localhost:8080, http://localhost:8081, http://localhost:8082" if g := writer.Body.String(); g != w { @@ -1424,9 +1424,9 @@ func TestBadServeKeys(t *testing.T) { } for i, tt := range testBadCases { h := &keysHandler{ - timeout: 0, // context times out immediately - server: tt.server, - clusterInfo: &fakeCluster{id: 1}, + timeout: 0, // context times out immediately + server: tt.server, + cluster: &fakeCluster{id: 1}, } rw := httptest.NewRecorder() h.ServeHTTP(rw, tt.req) @@ -1435,7 +1435,7 @@ func TestBadServeKeys(t *testing.T) { } if rw.Code != http.StatusMethodNotAllowed { gcid := rw.Header().Get("X-Etcd-Cluster-ID") - wcid := h.clusterInfo.ID().String() + wcid := h.cluster.ID().String() if gcid != wcid { t.Errorf("#%d: cid = %s, want %s", i, gcid, wcid) } @@ -1482,10 +1482,10 @@ func TestServeKeysGood(t *testing.T) { } for i, tt := range tests { h := &keysHandler{ - timeout: time.Hour, - server: server, - timer: &dummyRaftTimer{}, - clusterInfo: &fakeCluster{id: 1}, + timeout: time.Hour, + server: server, + timer: &dummyRaftTimer{}, + cluster: &fakeCluster{id: 1}, } rw := httptest.NewRecorder() h.ServeHTTP(rw, tt.req) @@ -1506,10 +1506,10 @@ func TestServeKeysEvent(t *testing.T) { }, } h := &keysHandler{ - timeout: time.Hour, - server: server, - clusterInfo: &fakeCluster{id: 1}, - timer: &dummyRaftTimer{}, + timeout: time.Hour, + server: server, + cluster: &fakeCluster{id: 1}, + timer: &dummyRaftTimer{}, } rw := httptest.NewRecorder() @@ -1528,7 +1528,7 @@ func TestServeKeysEvent(t *testing.T) { t.Errorf("got code=%d, want %d", rw.Code, wcode) } gcid := rw.Header().Get("X-Etcd-Cluster-ID") - wcid := h.clusterInfo.ID().String() + wcid := h.cluster.ID().String() if gcid != wcid { t.Errorf("cid = %s, want %s", gcid, wcid) } @@ -1550,10 +1550,10 @@ func TestServeKeysWatch(t *testing.T) { }, } h := &keysHandler{ - timeout: time.Hour, - server: server, - clusterInfo: &fakeCluster{id: 1}, - timer: &dummyRaftTimer{}, + timeout: time.Hour, + server: server, + cluster: &fakeCluster{id: 1}, + timer: &dummyRaftTimer{}, } go func() { ec <- &store.Event{ @@ -1578,7 +1578,7 @@ func TestServeKeysWatch(t *testing.T) { t.Errorf("got code=%d, want %d", rw.Code, wcode) } gcid := rw.Header().Get("X-Etcd-Cluster-ID") - wcid := h.clusterInfo.ID().String() + wcid := h.cluster.ID().String() if gcid != wcid { t.Errorf("cid = %s, want %s", gcid, wcid) } diff --git a/etcdserver/etcdhttp/peer.go b/etcdserver/etcdhttp/peer.go index 25a45d596..a54f687dc 100644 --- a/etcdserver/etcdhttp/peer.go +++ b/etcdserver/etcdhttp/peer.go @@ -28,9 +28,9 @@ const ( ) // NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests. -func NewPeerHandler(clusterInfo etcdserver.ClusterInfo, raftHandler http.Handler) http.Handler { +func NewPeerHandler(cluster etcdserver.Cluster, raftHandler http.Handler) http.Handler { mh := &peerMembersHandler{ - clusterInfo: clusterInfo, + cluster: cluster, } mux := http.NewServeMux() @@ -43,20 +43,20 @@ func NewPeerHandler(clusterInfo etcdserver.ClusterInfo, raftHandler http.Handler } type peerMembersHandler struct { - clusterInfo etcdserver.ClusterInfo + cluster etcdserver.Cluster } func (h *peerMembersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if !allowMethod(w, r.Method, "GET") { return } - w.Header().Set("X-Etcd-Cluster-ID", h.clusterInfo.ID().String()) + w.Header().Set("X-Etcd-Cluster-ID", h.cluster.ID().String()) if r.URL.Path != peerMembersPrefix { http.Error(w, "bad path", http.StatusBadRequest) return } - ms := h.clusterInfo.Members() + ms := h.cluster.Members() w.Header().Set("Content-Type", "application/json") if err := json.NewEncoder(w).Encode(ms); err != nil { log.Printf("etcdhttp: %v", err) diff --git a/etcdserver/etcdhttp/peer_test.go b/etcdserver/etcdhttp/peer_test.go index cff85f35b..68b2f0e70 100644 --- a/etcdserver/etcdhttp/peer_test.go +++ b/etcdserver/etcdhttp/peer_test.go @@ -76,7 +76,7 @@ func TestServeMembersFails(t *testing.T) { } for i, tt := range tests { rw := httptest.NewRecorder() - h := &peerMembersHandler{clusterInfo: nil} + h := &peerMembersHandler{cluster: nil} h.ServeHTTP(rw, &http.Request{Method: tt.method}) if rw.Code != tt.wcode { t.Errorf("#%d: code=%d, want %d", i, rw.Code, tt.wcode) @@ -91,7 +91,7 @@ func TestServeMembersGet(t *testing.T) { id: 1, members: map[uint64]*etcdserver.Member{1: &memb1, 2: &memb2}, } - h := &peerMembersHandler{clusterInfo: cluster} + h := &peerMembersHandler{cluster: cluster} msb, err := json.Marshal([]etcdserver.Member{memb1, memb2}) if err != nil { t.Fatal(err) diff --git a/etcdserver/raft.go b/etcdserver/raft.go index 2b6b8960b..391a7d576 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -192,7 +192,7 @@ func (r *raftNode) resumeSending() { p.Resume() } -func startNode(cfg *ServerConfig, cl *Cluster, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) { +func startNode(cfg *ServerConfig, cl *cluster, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) { var err error member := cl.MemberByName(cfg.Name) metadata := pbutil.MustMarshal( @@ -231,7 +231,7 @@ func startNode(cfg *ServerConfig, cl *Cluster, ids []types.ID) (id types.ID, n r return } -func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *Cluster, raft.Node, *raft.MemoryStorage, *wal.WAL) { +func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *cluster, raft.Node, *raft.MemoryStorage, *wal.WAL) { var walsnap walpb.Snapshot if snapshot != nil { walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term @@ -260,7 +260,7 @@ func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *Clust return id, cl, n, s, w } -func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *Cluster, raft.Node, *raft.MemoryStorage, *wal.WAL) { +func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *cluster, raft.Node, *raft.MemoryStorage, *wal.WAL) { var walsnap walpb.Snapshot if snapshot != nil { walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term diff --git a/etcdserver/server.go b/etcdserver/server.go index e298b672b..e864b6ea5 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -154,7 +154,7 @@ type EtcdServer struct { id types.ID attributes Attributes - Cluster *Cluster + cluster *cluster store store.Store @@ -178,7 +178,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { var n raft.Node var s *raft.MemoryStorage var id types.ID - var cl *Cluster + var cl *cluster // Run the migrations. dataVer, err := version.DetectDataDir(cfg.DataDir) @@ -198,7 +198,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { if err := cfg.VerifyJoinExisting(); err != nil { return nil, err } - cl, err = NewCluster(cfg.InitialClusterToken, cfg.InitialPeerURLsMap) + cl, err = newClusterFromURLsMap(cfg.InitialClusterToken, cfg.InitialPeerURLsMap) if err != nil { return nil, err } @@ -218,7 +218,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { if err := cfg.VerifyBootstrap(); err != nil { return nil, err } - cl, err = NewCluster(cfg.InitialClusterToken, cfg.InitialPeerURLsMap) + cl, err = newClusterFromURLsMap(cfg.InitialClusterToken, cfg.InitialPeerURLsMap) if err != nil { return nil, err } @@ -238,7 +238,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { if checkDuplicateURL(urlsmap) { return nil, fmt.Errorf("discovery cluster %s has duplicate url", urlsmap) } - if cl, err = NewCluster(cfg.InitialClusterToken, urlsmap); err != nil { + if cl, err = newClusterFromURLsMap(cfg.InitialClusterToken, urlsmap); err != nil { return nil, err } } @@ -302,7 +302,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { }, id: id, attributes: Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, - Cluster: cl, + cluster: cl, stats: sstats, lstats: lstats, SyncTicker: time.Tick(500 * time.Millisecond), @@ -379,10 +379,12 @@ func (s *EtcdServer) purgeFile() { func (s *EtcdServer) ID() types.ID { return s.id } +func (s *EtcdServer) Cluster() Cluster { return s.cluster } + func (s *EtcdServer) RaftHandler() http.Handler { return s.r.transport.Handler() } func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error { - if s.Cluster.IsIDRemoved(types.ID(m.From)) { + if s.cluster.IsIDRemoved(types.ID(m.From)) { log.Printf("etcdserver: reject message from removed member %s", types.ID(m.From).String()) return httptypes.NewHTTPError(http.StatusForbidden, "cannot process message from removed member") } @@ -392,7 +394,7 @@ func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error { return s.r.Step(ctx, m) } -func (s *EtcdServer) IsIDRemoved(id uint64) bool { return s.Cluster.IsIDRemoved(types.ID(id)) } +func (s *EtcdServer) IsIDRemoved(id uint64) bool { return s.cluster.IsIDRemoved(types.ID(id)) } func (s *EtcdServer) ReportUnreachable(id uint64) { s.r.ReportUnreachable(id) } @@ -432,11 +434,11 @@ func (s *EtcdServer) run() { if err := s.store.Recovery(apply.snapshot.Data); err != nil { log.Panicf("recovery store error: %v", err) } - s.Cluster.Recover() + s.cluster.Recover() // recover raft transport s.r.transport.RemoveAllPeers() - for _, m := range s.Cluster.Members() { + for _, m := range s.cluster.Members() { if m.ID == s.ID() { continue } @@ -700,7 +702,7 @@ func (s *EtcdServer) publish(retryInterval time.Duration) { cancel() switch err { case nil: - log.Printf("etcdserver: published %+v to cluster %s", s.attributes, s.Cluster.ID()) + log.Printf("etcdserver: published %+v to cluster %s", s.attributes, s.cluster.ID()) return case ErrStopped: log.Printf("etcdserver: aborting publish because server is stopped") @@ -713,7 +715,7 @@ func (s *EtcdServer) publish(retryInterval time.Duration) { func (s *EtcdServer) send(ms []raftpb.Message) { for i, _ := range ms { - if s.Cluster.IsIDRemoved(types.ID(ms[i].To)) { + if s.cluster.IsIDRemoved(types.ID(ms[i].To)) { ms[i].To = 0 } } @@ -791,10 +793,10 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response { if err := json.Unmarshal([]byte(r.Val), &attr); err != nil { log.Panicf("unmarshal %s should never fail: %v", r.Val, err) } - s.Cluster.UpdateAttributes(id, attr) + s.cluster.UpdateAttributes(id, attr) } if r.Path == path.Join(StoreClusterPrefix, "version") { - s.Cluster.SetVersion(semver.Must(semver.NewVersion(r.Val))) + s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val))) } return f(s.store.Set(r.Path, r.Dir, r.Val, expr)) } @@ -819,7 +821,7 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response { // applyConfChange applies a ConfChange to the server. It is only // invoked with a ConfChange that has already passed through Raft func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState) (bool, error) { - if err := s.Cluster.ValidateConfigurationChange(cc); err != nil { + if err := s.cluster.ValidateConfigurationChange(cc); err != nil { cc.NodeID = raft.None s.r.ApplyConfChange(cc) return false, err @@ -834,21 +836,21 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con if cc.NodeID != uint64(m.ID) { log.Panicf("nodeID should always be equal to member ID") } - s.Cluster.AddMember(m) + s.cluster.AddMember(m) if m.ID == s.id { - log.Printf("etcdserver: added local member %s %v to cluster %s", m.ID, m.PeerURLs, s.Cluster.ID()) + log.Printf("etcdserver: added local member %s %v to cluster %s", m.ID, m.PeerURLs, s.cluster.ID()) } else { s.r.transport.AddPeer(m.ID, m.PeerURLs) - log.Printf("etcdserver: added member %s %v to cluster %s", m.ID, m.PeerURLs, s.Cluster.ID()) + log.Printf("etcdserver: added member %s %v to cluster %s", m.ID, m.PeerURLs, s.cluster.ID()) } case raftpb.ConfChangeRemoveNode: id := types.ID(cc.NodeID) - s.Cluster.RemoveMember(id) + s.cluster.RemoveMember(id) if id == s.id { return true, nil } else { s.r.transport.RemovePeer(id) - log.Printf("etcdserver: removed member %s from cluster %s", id, s.Cluster.ID()) + log.Printf("etcdserver: removed member %s from cluster %s", id, s.cluster.ID()) } case raftpb.ConfChangeUpdateNode: m := new(Member) @@ -858,12 +860,12 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con if cc.NodeID != uint64(m.ID) { log.Panicf("nodeID should always be equal to member ID") } - s.Cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes) + s.cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes) if m.ID == s.id { - log.Printf("etcdserver: update local member %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID()) + log.Printf("etcdserver: update local member %s %v in cluster %s", m.ID, m.PeerURLs, s.cluster.ID()) } else { s.r.transport.UpdatePeer(m.ID, m.PeerURLs) - log.Printf("etcdserver: update member %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID()) + log.Printf("etcdserver: update member %s %v in cluster %s", m.ID, m.PeerURLs, s.cluster.ID()) } } return false, nil @@ -917,10 +919,10 @@ func (s *EtcdServer) PauseSending() { s.r.pauseSending() } func (s *EtcdServer) ResumeSending() { s.r.resumeSending() } func (s *EtcdServer) ClusterVersion() *semver.Version { - if s.Cluster == nil { + if s.cluster == nil { return nil } - return s.Cluster.Version() + return s.cluster.Version() } // monitorVersions checks the member's version every monitorVersion interval. @@ -940,7 +942,7 @@ func (s *EtcdServer) monitorVersions() { continue } - v := decideClusterVersion(getVersions(s.Cluster, s.cfg.Transport)) + v := decideClusterVersion(getVersions(s.cluster, s.cfg.Transport)) if v != nil { // only keep major.minor version for comparasion v = &semver.Version{ @@ -952,7 +954,7 @@ func (s *EtcdServer) monitorVersions() { // if the current version is nil: // 1. use the decided version if possible // 2. or use the min cluster version - if s.Cluster.Version() == nil { + if s.cluster.Version() == nil { if v != nil { go s.updateClusterVersion(v.String()) } else { @@ -963,17 +965,17 @@ func (s *EtcdServer) monitorVersions() { // update cluster version only if the decided version is greater than // the current cluster version - if v != nil && s.Cluster.Version().LessThan(*v) { + if v != nil && s.cluster.Version().LessThan(*v) { go s.updateClusterVersion(v.String()) } } } func (s *EtcdServer) updateClusterVersion(ver string) { - if s.Cluster.Version() == nil { + if s.cluster.Version() == nil { log.Printf("etcdsever: setting up the initial cluster version to %v", ver) } else { - log.Printf("etcdsever: updating the cluster version from %v to %v", s.Cluster.Version(), ver) + log.Printf("etcdsever: updating the cluster version from %v to %v", s.cluster.Version(), ver) } req := pb.Request{ Method: "PUT", diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index a5ac90490..8a922a8d6 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -393,7 +393,7 @@ func TestApplyRequestOnAdminMemberAttributes(t *testing.T) { cl := newTestCluster([]*Member{{ID: 1}}) srv := &EtcdServer{ store: &storeRecorder{}, - Cluster: cl, + cluster: cl, } req := pb.Request{ Method: "PUT", @@ -453,7 +453,7 @@ func TestApplyConfChangeError(t *testing.T) { n := &nodeRecorder{} srv := &EtcdServer{ r: raftNode{Node: n}, - Cluster: cl, + cluster: cl, } _, err := srv.applyConfChange(tt.cc, nil) if err != tt.werr { @@ -484,7 +484,7 @@ func TestApplyConfChangeShouldStop(t *testing.T) { Node: &nodeRecorder{}, transport: &nopTransporter{}, }, - Cluster: cl, + cluster: cl, } cc := raftpb.ConfChange{ Type: raftpb.ConfChangeRemoveNode, @@ -780,7 +780,7 @@ func TestRecvSnapshot(t *testing.T) { raftStorage: raft.NewMemoryStorage(), }, store: st, - Cluster: cl, + cluster: cl, } s.start() @@ -815,7 +815,7 @@ func TestApplySnapshotAndCommittedEntries(t *testing.T) { transport: &nopTransporter{}, }, store: st, - Cluster: cl, + cluster: cl, } s.start() @@ -859,7 +859,7 @@ func TestAddMember(t *testing.T) { transport: &nopTransporter{}, }, store: st, - Cluster: cl, + cluster: cl, reqIDGen: idutil.NewGenerator(0, time.Time{}), } s.start() @@ -898,7 +898,7 @@ func TestRemoveMember(t *testing.T) { transport: &nopTransporter{}, }, store: st, - Cluster: cl, + cluster: cl, reqIDGen: idutil.NewGenerator(0, time.Time{}), } s.start() @@ -936,7 +936,7 @@ func TestUpdateMember(t *testing.T) { transport: &nopTransporter{}, }, store: st, - Cluster: cl, + cluster: cl, reqIDGen: idutil.NewGenerator(0, time.Time{}), } s.start() @@ -969,7 +969,7 @@ func TestPublish(t *testing.T) { id: 1, r: raftNode{Node: n}, attributes: Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}}, - Cluster: &Cluster{}, + cluster: &cluster{}, w: w, reqIDGen: idutil.NewGenerator(0, time.Time{}), } @@ -1010,7 +1010,7 @@ func TestPublishStopped(t *testing.T) { Node: &nodeRecorder{}, transport: &nopTransporter{}, }, - Cluster: &Cluster{}, + cluster: &cluster{}, w: &waitRecorder{}, done: make(chan struct{}), stop: make(chan struct{}), @@ -1051,7 +1051,7 @@ func TestUpdateVersion(t *testing.T) { id: 1, r: raftNode{Node: n}, attributes: Attributes{Name: "node1", ClientURLs: []string{"http://node1.com"}}, - Cluster: &Cluster{}, + cluster: &cluster{}, w: w, reqIDGen: idutil.NewGenerator(0, time.Time{}), } @@ -1137,7 +1137,7 @@ func TestGetOtherPeerURLs(t *testing.T) { }, } for i, tt := range tests { - cl := NewClusterFromMembers("", types.ID(0), tt.membs) + cl := newClusterFromMembers("", types.ID(0), tt.membs) urls := getRemotePeerURLs(cl, tt.self) if !reflect.DeepEqual(urls, tt.wurls) { t.Errorf("#%d: urls = %+v, want %+v", i, urls, tt.wurls) diff --git a/integration/cluster_test.go b/integration/cluster_test.go index 453b27407..6c678158f 100644 --- a/integration/cluster_test.go +++ b/integration/cluster_test.go @@ -695,7 +695,7 @@ func (m *member) Launch() error { m.s.SyncTicker = time.Tick(500 * time.Millisecond) m.s.Start() - m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s.Cluster, m.s.RaftHandler())} + m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s.Cluster(), m.s.RaftHandler())} for _, ln := range m.PeerListeners { hs := &httptest.Server{