diff --git a/etcdmain/etcd.go b/etcdmain/etcd.go index 68ceb6af6..9925d0839 100644 --- a/etcdmain/etcd.go +++ b/etcdmain/etcd.go @@ -203,7 +203,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) { Handler: etcdhttp.NewClientHandler(s), Info: cfg.corsInfo, } - ph := etcdhttp.NewPeerHandler(s.Cluster, s.RaftHandler()) + ph := etcdhttp.NewPeerHandler(s.Cluster(), s.RaftHandler()) // Start the peer server in a goroutine for _, l := range plns { go func(l net.Listener) { diff --git a/etcdserver/cluster.go b/etcdserver/cluster.go index 90868cc1d..3779e84a3 100644 --- a/etcdserver/cluster.go +++ b/etcdserver/cluster.go @@ -39,7 +39,7 @@ const ( attributesSuffix = "attributes" ) -type ClusterInfo interface { +type Cluster interface { // ID returns the cluster ID ID() types.ID // ClientURLs returns an aggregate set of all URLs on which this @@ -56,7 +56,7 @@ type ClusterInfo interface { } // Cluster is a list of Members that belong to the same raft cluster -type Cluster struct { +type cluster struct { id types.ID token string store store.Store @@ -69,9 +69,9 @@ type Cluster struct { removed map[types.ID]bool } -func NewCluster(token string, initial types.URLsMap) (*Cluster, error) { +func newClusterFromURLsMap(token string, urlsmap types.URLsMap) (*cluster, error) { c := newCluster(token) - for name, urls := range initial { + for name, urls := range urlsmap { m := NewMember(name, urls, token, nil) if _, ok := c.members[m.ID]; ok { return nil, fmt.Errorf("member exists with identical ID %v", m) @@ -85,7 +85,7 @@ func NewCluster(token string, initial types.URLsMap) (*Cluster, error) { return c, nil } -func NewClusterFromMembers(token string, id types.ID, membs []*Member) *Cluster { +func newClusterFromMembers(token string, id types.ID, membs []*Member) *cluster { c := newCluster(token) c.id = id for _, m := range membs { @@ -94,17 +94,17 @@ func NewClusterFromMembers(token string, id types.ID, membs []*Member) *Cluster return c } -func newCluster(token string) *Cluster { - return &Cluster{ +func newCluster(token string) *cluster { + return &cluster{ token: token, members: make(map[types.ID]*Member), removed: make(map[types.ID]bool), } } -func (c *Cluster) ID() types.ID { return c.id } +func (c *cluster) ID() types.ID { return c.id } -func (c *Cluster) Members() []*Member { +func (c *cluster) Members() []*Member { c.Lock() defer c.Unlock() var sms SortableMemberSlice @@ -115,7 +115,7 @@ func (c *Cluster) Members() []*Member { return []*Member(sms) } -func (c *Cluster) Member(id types.ID) *Member { +func (c *cluster) Member(id types.ID) *Member { c.Lock() defer c.Unlock() return c.members[id].Clone() @@ -123,7 +123,7 @@ func (c *Cluster) Member(id types.ID) *Member { // MemberByName returns a Member with the given name if exists. // If more than one member has the given name, it will panic. -func (c *Cluster) MemberByName(name string) *Member { +func (c *cluster) MemberByName(name string) *Member { c.Lock() defer c.Unlock() var memb *Member @@ -138,7 +138,7 @@ func (c *Cluster) MemberByName(name string) *Member { return memb.Clone() } -func (c *Cluster) MemberIDs() []types.ID { +func (c *cluster) MemberIDs() []types.ID { c.Lock() defer c.Unlock() var ids []types.ID @@ -149,7 +149,7 @@ func (c *Cluster) MemberIDs() []types.ID { return ids } -func (c *Cluster) IsIDRemoved(id types.ID) bool { +func (c *cluster) IsIDRemoved(id types.ID) bool { c.Lock() defer c.Unlock() return c.removed[id] @@ -157,7 +157,7 @@ func (c *Cluster) IsIDRemoved(id types.ID) bool { // PeerURLs returns a list of all peer addresses. // The returned list is sorted in ascending lexicographical order. -func (c *Cluster) PeerURLs() []string { +func (c *cluster) PeerURLs() []string { c.Lock() defer c.Unlock() urls := make([]string, 0) @@ -172,7 +172,7 @@ func (c *Cluster) PeerURLs() []string { // ClientURLs returns a list of all client addresses. // The returned list is sorted in ascending lexicographical order. -func (c *Cluster) ClientURLs() []string { +func (c *cluster) ClientURLs() []string { c.Lock() defer c.Unlock() urls := make([]string, 0) @@ -185,7 +185,7 @@ func (c *Cluster) ClientURLs() []string { return urls } -func (c *Cluster) String() string { +func (c *cluster) String() string { c.Lock() defer c.Unlock() b := &bytes.Buffer{} @@ -203,7 +203,7 @@ func (c *Cluster) String() string { return b.String() } -func (c *Cluster) genID() { +func (c *cluster) genID() { mIDs := c.MemberIDs() b := make([]byte, 8*len(mIDs)) for i, id := range mIDs { @@ -213,18 +213,18 @@ func (c *Cluster) genID() { c.id = types.ID(binary.BigEndian.Uint64(hash[:8])) } -func (c *Cluster) SetID(id types.ID) { c.id = id } +func (c *cluster) SetID(id types.ID) { c.id = id } -func (c *Cluster) SetStore(st store.Store) { c.store = st } +func (c *cluster) SetStore(st store.Store) { c.store = st } -func (c *Cluster) Recover() { +func (c *cluster) Recover() { c.members, c.removed = membersFromStore(c.store) c.version = clusterVersionFromStore(c.store) } // ValidateConfigurationChange takes a proposed ConfChange and // ensures that it is still valid. -func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error { +func (c *cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error { members, removed := membersFromStore(c.store) id := types.ID(cc.NodeID) if removed[id] { @@ -285,7 +285,7 @@ func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error { // AddMember adds a new Member into the cluster, and saves the given member's // raftAttributes into the store. The given member should have empty attributes. // A Member with a matching id must not exist. -func (c *Cluster) AddMember(m *Member) { +func (c *cluster) AddMember(m *Member) { c.Lock() defer c.Unlock() b, err := json.Marshal(m.RaftAttributes) @@ -301,7 +301,7 @@ func (c *Cluster) AddMember(m *Member) { // RemoveMember removes a member from the store. // The given id MUST exist, or the function panics. -func (c *Cluster) RemoveMember(id types.ID) { +func (c *cluster) RemoveMember(id types.ID) { c.Lock() defer c.Unlock() if _, err := c.store.Delete(memberStoreKey(id), true, true); err != nil { @@ -314,14 +314,14 @@ func (c *Cluster) RemoveMember(id types.ID) { c.removed[id] = true } -func (c *Cluster) UpdateAttributes(id types.ID, attr Attributes) { +func (c *cluster) UpdateAttributes(id types.ID, attr Attributes) { c.Lock() defer c.Unlock() c.members[id].Attributes = attr // TODO: update store in this function } -func (c *Cluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) { +func (c *cluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) { c.Lock() defer c.Unlock() b, err := json.Marshal(raftAttr) @@ -335,7 +335,7 @@ func (c *Cluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) { c.members[id].RaftAttributes = raftAttr } -func (c *Cluster) Version() *semver.Version { +func (c *cluster) Version() *semver.Version { c.Lock() defer c.Unlock() if c.version == nil { @@ -344,7 +344,7 @@ func (c *Cluster) Version() *semver.Version { return semver.Must(semver.NewVersion(c.version.String())) } -func (c *Cluster) SetVersion(ver *semver.Version) { +func (c *cluster) SetVersion(ver *semver.Version) { c.Lock() defer c.Unlock() if c.version != nil { @@ -401,7 +401,7 @@ func clusterVersionFromStore(st store.Store) *semver.Version { // with the existing cluster. If the validation succeeds, it assigns the IDs // from the existing cluster to the local cluster. // If the validation fails, an error will be returned. -func ValidateClusterAndAssignIDs(local *Cluster, existing *Cluster) error { +func ValidateClusterAndAssignIDs(local *cluster, existing *cluster) error { ems := existing.Members() lms := local.Members() if len(ems) != len(lms) { diff --git a/etcdserver/cluster_test.go b/etcdserver/cluster_test.go index a18f06a78..796e54bbd 100644 --- a/etcdserver/cluster_test.go +++ b/etcdserver/cluster_test.go @@ -470,7 +470,7 @@ func TestClusterAddMember(t *testing.T) { } func TestClusterMembers(t *testing.T) { - cls := &Cluster{ + cls := &cluster{ members: map[types.ID]*Member{ 1: &Member{ID: 1}, 20: &Member{ID: 20}, @@ -521,8 +521,8 @@ func TestNodeToMember(t *testing.T) { } } -func newTestCluster(membs []*Member) *Cluster { - c := &Cluster{members: make(map[types.ID]*Member), removed: make(map[types.ID]bool)} +func newTestCluster(membs []*Member) *cluster { + c := &cluster{members: make(map[types.ID]*Member), removed: make(map[types.ID]bool)} for _, m := range membs { c.members[m.ID] = m } diff --git a/etcdserver/cluster_util.go b/etcdserver/cluster_util.go index 203bbd0f5..8b6889550 100644 --- a/etcdserver/cluster_util.go +++ b/etcdserver/cluster_util.go @@ -30,7 +30,7 @@ import ( // isMemberBootstrapped tries to check if the given member has been bootstrapped // in the given cluster. -func isMemberBootstrapped(cl *Cluster, member string, tr *http.Transport) bool { +func isMemberBootstrapped(cl *cluster, member string, tr *http.Transport) bool { rcl, err := getClusterFromRemotePeers(getRemotePeerURLs(cl, member), false, tr) if err != nil { return false @@ -51,12 +51,12 @@ func isMemberBootstrapped(cl *Cluster, member string, tr *http.Transport) bool { // these URLs. The first URL to provide a response is used. If no URLs provide // a response, or a Cluster cannot be successfully created from a received // response, an error is returned. -func GetClusterFromRemotePeers(urls []string, tr *http.Transport) (*Cluster, error) { +func GetClusterFromRemotePeers(urls []string, tr *http.Transport) (*cluster, error) { return getClusterFromRemotePeers(urls, true, tr) } // If logerr is true, it prints out more error messages. -func getClusterFromRemotePeers(urls []string, logerr bool, tr *http.Transport) (*Cluster, error) { +func getClusterFromRemotePeers(urls []string, logerr bool, tr *http.Transport) (*cluster, error) { cc := &http.Client{ Transport: tr, Timeout: time.Second, @@ -90,14 +90,14 @@ func getClusterFromRemotePeers(urls []string, logerr bool, tr *http.Transport) ( } continue } - return NewClusterFromMembers("", id, membs), nil + return newClusterFromMembers("", id, membs), nil } return nil, fmt.Errorf("etcdserver: could not retrieve cluster information from the given urls") } // getRemotePeerURLs returns peer urls of remote members in the cluster. The // returned list is sorted in ascending lexicographical order. -func getRemotePeerURLs(cl ClusterInfo, local string) []string { +func getRemotePeerURLs(cl Cluster, local string) []string { us := make([]string, 0) for _, m := range cl.Members() { if m.Name == local { @@ -113,7 +113,7 @@ func getRemotePeerURLs(cl ClusterInfo, local string) []string { // The key of the returned map is the member's ID. The value of the returned map // is the semver version string. If it fails to get the version of a member, the key // will be an empty string. -func getVersions(cl ClusterInfo, tr *http.Transport) map[string]string { +func getVersions(cl Cluster, tr *http.Transport) map[string]string { members := cl.Members() vers := make(map[string]string) for _, m := range members { diff --git a/etcdserver/etcdhttp/client.go b/etcdserver/etcdhttp/client.go index 133c80fa3..299a1757e 100644 --- a/etcdserver/etcdhttp/client.go +++ b/etcdserver/etcdhttp/client.go @@ -62,11 +62,11 @@ func NewClientHandler(server *etcdserver.EtcdServer) http.Handler { sec := security.NewStore(server, defaultServerTimeout) kh := &keysHandler{ - sec: sec, - server: server, - clusterInfo: server.Cluster, - timer: server, - timeout: defaultServerTimeout, + sec: sec, + server: server, + cluster: server.Cluster(), + timer: server, + timeout: defaultServerTimeout, } sh := &statsHandler{ @@ -74,19 +74,19 @@ func NewClientHandler(server *etcdserver.EtcdServer) http.Handler { } mh := &membersHandler{ - sec: sec, - server: server, - clusterInfo: server.Cluster, - clock: clockwork.NewRealClock(), + sec: sec, + server: server, + cluster: server.Cluster(), + clock: clockwork.NewRealClock(), } dmh := &deprecatedMachinesHandler{ - clusterInfo: server.Cluster, + cluster: server.Cluster(), } sech := &securityHandler{ - sec: sec, - clusterInfo: server.Cluster, + sec: sec, + cluster: server.Cluster(), } mux := http.NewServeMux() @@ -109,11 +109,11 @@ func NewClientHandler(server *etcdserver.EtcdServer) http.Handler { } type keysHandler struct { - sec *security.Store - server etcdserver.Server - clusterInfo etcdserver.ClusterInfo - timer etcdserver.RaftTimer - timeout time.Duration + sec *security.Store + server etcdserver.Server + cluster etcdserver.Cluster + timer etcdserver.RaftTimer + timeout time.Duration } func (h *keysHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -121,7 +121,7 @@ func (h *keysHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - w.Header().Set("X-Etcd-Cluster-ID", h.clusterInfo.ID().String()) + w.Header().Set("X-Etcd-Cluster-ID", h.cluster.ID().String()) ctx, cancel := context.WithTimeout(context.Background(), h.timeout) defer cancel() @@ -159,22 +159,22 @@ func (h *keysHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } type deprecatedMachinesHandler struct { - clusterInfo etcdserver.ClusterInfo + cluster etcdserver.Cluster } func (h *deprecatedMachinesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if !allowMethod(w, r.Method, "GET", "HEAD") { return } - endpoints := h.clusterInfo.ClientURLs() + endpoints := h.cluster.ClientURLs() w.Write([]byte(strings.Join(endpoints, ", "))) } type membersHandler struct { - sec *security.Store - server etcdserver.Server - clusterInfo etcdserver.ClusterInfo - clock clockwork.Clock + sec *security.Store + server etcdserver.Server + cluster etcdserver.Cluster + clock clockwork.Clock } func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -185,7 +185,7 @@ func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { writeNoAuth(w) return } - w.Header().Set("X-Etcd-Cluster-ID", h.clusterInfo.ID().String()) + w.Header().Set("X-Etcd-Cluster-ID", h.cluster.ID().String()) ctx, cancel := context.WithTimeout(context.Background(), defaultServerTimeout) defer cancel() @@ -194,7 +194,7 @@ func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { case "GET": switch trimPrefix(r.URL.Path, membersPrefix) { case "": - mc := newMemberCollection(h.clusterInfo.Members()) + mc := newMemberCollection(h.cluster.Members()) w.Header().Set("Content-Type", "application/json") if err := json.NewEncoder(w).Encode(mc); err != nil { log.Printf("etcdhttp: %v", err) @@ -205,7 +205,7 @@ func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { writeError(w, httptypes.NewHTTPError(http.StatusServiceUnavailable, "During election")) return } - m := newMember(h.clusterInfo.Member(id)) + m := newMember(h.cluster.Member(id)) w.Header().Set("Content-Type", "application/json") if err := json.NewEncoder(w).Encode(m); err != nil { log.Printf("etcdhttp: %v", err) diff --git a/etcdserver/etcdhttp/client_security.go b/etcdserver/etcdhttp/client_security.go index 8c09a6948..0bbf1a6c4 100644 --- a/etcdserver/etcdhttp/client_security.go +++ b/etcdserver/etcdhttp/client_security.go @@ -28,8 +28,8 @@ import ( ) type securityHandler struct { - sec *security.Store - clusterInfo etcdserver.ClusterInfo + sec *security.Store + cluster etcdserver.Cluster } func hasWriteRootAccess(sec *security.Store, r *http.Request) bool { @@ -140,7 +140,7 @@ func (sh *securityHandler) baseRoles(w http.ResponseWriter, r *http.Request) { writeNoAuth(w) return } - w.Header().Set("X-Etcd-Cluster-ID", sh.clusterInfo.ID().String()) + w.Header().Set("X-Etcd-Cluster-ID", sh.cluster.ID().String()) w.Header().Set("Content-Type", "application/json") var rolesCollections struct { Roles []string `json:"roles"` @@ -185,7 +185,7 @@ func (sh *securityHandler) forRole(w http.ResponseWriter, r *http.Request, role writeNoAuth(w) return } - w.Header().Set("X-Etcd-Cluster-ID", sh.clusterInfo.ID().String()) + w.Header().Set("X-Etcd-Cluster-ID", sh.cluster.ID().String()) switch r.Method { case "GET": @@ -245,7 +245,7 @@ func (sh *securityHandler) baseUsers(w http.ResponseWriter, r *http.Request) { writeNoAuth(w) return } - w.Header().Set("X-Etcd-Cluster-ID", sh.clusterInfo.ID().String()) + w.Header().Set("X-Etcd-Cluster-ID", sh.cluster.ID().String()) w.Header().Set("Content-Type", "application/json") var usersCollections struct { Users []string `json:"users"` @@ -290,7 +290,7 @@ func (sh *securityHandler) forUser(w http.ResponseWriter, r *http.Request, user writeNoAuth(w) return } - w.Header().Set("X-Etcd-Cluster-ID", sh.clusterInfo.ID().String()) + w.Header().Set("X-Etcd-Cluster-ID", sh.cluster.ID().String()) switch r.Method { case "GET": @@ -360,7 +360,7 @@ func (sh *securityHandler) enableDisable(w http.ResponseWriter, r *http.Request) writeNoAuth(w) return } - w.Header().Set("X-Etcd-Cluster-ID", sh.clusterInfo.ID().String()) + w.Header().Set("X-Etcd-Cluster-ID", sh.cluster.ID().String()) w.Header().Set("Content-Type", "application/json") isEnabled := sh.sec.SecurityEnabled() switch r.Method { diff --git a/etcdserver/etcdhttp/client_test.go b/etcdserver/etcdhttp/client_test.go index 2af1a83ef..c5779a117 100644 --- a/etcdserver/etcdhttp/client_test.go +++ b/etcdserver/etcdhttp/client_test.go @@ -564,9 +564,9 @@ func TestServeMembers(t *testing.T) { members: map[uint64]*etcdserver.Member{1: &memb1, 2: &memb2}, } h := &membersHandler{ - server: &serverRecorder{}, - clock: clockwork.NewFakeClock(), - clusterInfo: cluster, + server: &serverRecorder{}, + clock: clockwork.NewFakeClock(), + cluster: cluster, } wmc := string(`{"members":[{"id":"c","name":"","peerURLs":[],"clientURLs":["http://localhost:8080"]},{"id":"d","name":"","peerURLs":[],"clientURLs":["http://localhost:8081"]}]}`) @@ -617,9 +617,9 @@ func TestServeLeader(t *testing.T) { members: map[uint64]*etcdserver.Member{1: &memb1, 2: &memb2}, } h := &membersHandler{ - server: &serverRecorder{}, - clock: clockwork.NewFakeClock(), - clusterInfo: cluster, + server: &serverRecorder{}, + clock: clockwork.NewFakeClock(), + cluster: cluster, } wmc := string(`{"id":"1","name":"","peerURLs":[],"clientURLs":["http://localhost:8080"]}`) @@ -669,9 +669,9 @@ func TestServeMembersCreate(t *testing.T) { req.Header.Set("Content-Type", "application/json") s := &serverRecorder{} h := &membersHandler{ - server: s, - clock: clockwork.NewFakeClock(), - clusterInfo: &fakeCluster{id: 1}, + server: s, + clock: clockwork.NewFakeClock(), + cluster: &fakeCluster{id: 1}, } rw := httptest.NewRecorder() @@ -687,7 +687,7 @@ func TestServeMembersCreate(t *testing.T) { t.Errorf("content-type = %s, want %s", gct, wct) } gcid := rw.Header().Get("X-Etcd-Cluster-ID") - wcid := h.clusterInfo.ID().String() + wcid := h.cluster.ID().String() if gcid != wcid { t.Errorf("cid = %s, want %s", gcid, wcid) } @@ -718,8 +718,8 @@ func TestServeMembersDelete(t *testing.T) { } s := &serverRecorder{} h := &membersHandler{ - server: s, - clusterInfo: &fakeCluster{id: 1}, + server: s, + cluster: &fakeCluster{id: 1}, } rw := httptest.NewRecorder() @@ -730,7 +730,7 @@ func TestServeMembersDelete(t *testing.T) { t.Errorf("code=%d, want %d", rw.Code, wcode) } gcid := rw.Header().Get("X-Etcd-Cluster-ID") - wcid := h.clusterInfo.ID().String() + wcid := h.cluster.ID().String() if gcid != wcid { t.Errorf("cid = %s, want %s", gcid, wcid) } @@ -754,9 +754,9 @@ func TestServeMembersUpdate(t *testing.T) { req.Header.Set("Content-Type", "application/json") s := &serverRecorder{} h := &membersHandler{ - server: s, - clock: clockwork.NewFakeClock(), - clusterInfo: &fakeCluster{id: 1}, + server: s, + clock: clockwork.NewFakeClock(), + cluster: &fakeCluster{id: 1}, } rw := httptest.NewRecorder() @@ -768,7 +768,7 @@ func TestServeMembersUpdate(t *testing.T) { } gcid := rw.Header().Get("X-Etcd-Cluster-ID") - wcid := h.clusterInfo.ID().String() + wcid := h.cluster.ID().String() if gcid != wcid { t.Errorf("cid = %s, want %s", gcid, wcid) } @@ -1046,9 +1046,9 @@ func TestServeMembersFail(t *testing.T) { } for i, tt := range tests { h := &membersHandler{ - server: tt.server, - clusterInfo: &fakeCluster{id: 1}, - clock: clockwork.NewFakeClock(), + server: tt.server, + cluster: &fakeCluster{id: 1}, + clock: clockwork.NewFakeClock(), } rw := httptest.NewRecorder() h.ServeHTTP(rw, tt.req) @@ -1057,7 +1057,7 @@ func TestServeMembersFail(t *testing.T) { } if rw.Code != http.StatusMethodNotAllowed { gcid := rw.Header().Get("X-Etcd-Cluster-ID") - wcid := h.clusterInfo.ID().String() + wcid := h.cluster.ID().String() if gcid != wcid { t.Errorf("#%d: cid = %s, want %s", i, gcid, wcid) } @@ -1141,7 +1141,7 @@ func TestV2DeprecatedMachinesEndpoint(t *testing.T) { {"POST", http.StatusMethodNotAllowed}, } - m := &deprecatedMachinesHandler{clusterInfo: &etcdserver.Cluster{}} + m := &deprecatedMachinesHandler{cluster: &fakeCluster{}} s := httptest.NewServer(m) defer s.Close() @@ -1170,7 +1170,7 @@ func TestServeMachines(t *testing.T) { if err != nil { t.Fatal(err) } - h := &deprecatedMachinesHandler{clusterInfo: cluster} + h := &deprecatedMachinesHandler{cluster: cluster} h.ServeHTTP(writer, req) w := "http://localhost:8080, http://localhost:8081, http://localhost:8082" if g := writer.Body.String(); g != w { @@ -1424,9 +1424,9 @@ func TestBadServeKeys(t *testing.T) { } for i, tt := range testBadCases { h := &keysHandler{ - timeout: 0, // context times out immediately - server: tt.server, - clusterInfo: &fakeCluster{id: 1}, + timeout: 0, // context times out immediately + server: tt.server, + cluster: &fakeCluster{id: 1}, } rw := httptest.NewRecorder() h.ServeHTTP(rw, tt.req) @@ -1435,7 +1435,7 @@ func TestBadServeKeys(t *testing.T) { } if rw.Code != http.StatusMethodNotAllowed { gcid := rw.Header().Get("X-Etcd-Cluster-ID") - wcid := h.clusterInfo.ID().String() + wcid := h.cluster.ID().String() if gcid != wcid { t.Errorf("#%d: cid = %s, want %s", i, gcid, wcid) } @@ -1482,10 +1482,10 @@ func TestServeKeysGood(t *testing.T) { } for i, tt := range tests { h := &keysHandler{ - timeout: time.Hour, - server: server, - timer: &dummyRaftTimer{}, - clusterInfo: &fakeCluster{id: 1}, + timeout: time.Hour, + server: server, + timer: &dummyRaftTimer{}, + cluster: &fakeCluster{id: 1}, } rw := httptest.NewRecorder() h.ServeHTTP(rw, tt.req) @@ -1506,10 +1506,10 @@ func TestServeKeysEvent(t *testing.T) { }, } h := &keysHandler{ - timeout: time.Hour, - server: server, - clusterInfo: &fakeCluster{id: 1}, - timer: &dummyRaftTimer{}, + timeout: time.Hour, + server: server, + cluster: &fakeCluster{id: 1}, + timer: &dummyRaftTimer{}, } rw := httptest.NewRecorder() @@ -1528,7 +1528,7 @@ func TestServeKeysEvent(t *testing.T) { t.Errorf("got code=%d, want %d", rw.Code, wcode) } gcid := rw.Header().Get("X-Etcd-Cluster-ID") - wcid := h.clusterInfo.ID().String() + wcid := h.cluster.ID().String() if gcid != wcid { t.Errorf("cid = %s, want %s", gcid, wcid) } @@ -1550,10 +1550,10 @@ func TestServeKeysWatch(t *testing.T) { }, } h := &keysHandler{ - timeout: time.Hour, - server: server, - clusterInfo: &fakeCluster{id: 1}, - timer: &dummyRaftTimer{}, + timeout: time.Hour, + server: server, + cluster: &fakeCluster{id: 1}, + timer: &dummyRaftTimer{}, } go func() { ec <- &store.Event{ @@ -1578,7 +1578,7 @@ func TestServeKeysWatch(t *testing.T) { t.Errorf("got code=%d, want %d", rw.Code, wcode) } gcid := rw.Header().Get("X-Etcd-Cluster-ID") - wcid := h.clusterInfo.ID().String() + wcid := h.cluster.ID().String() if gcid != wcid { t.Errorf("cid = %s, want %s", gcid, wcid) } diff --git a/etcdserver/etcdhttp/peer.go b/etcdserver/etcdhttp/peer.go index 25a45d596..a54f687dc 100644 --- a/etcdserver/etcdhttp/peer.go +++ b/etcdserver/etcdhttp/peer.go @@ -28,9 +28,9 @@ const ( ) // NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests. -func NewPeerHandler(clusterInfo etcdserver.ClusterInfo, raftHandler http.Handler) http.Handler { +func NewPeerHandler(cluster etcdserver.Cluster, raftHandler http.Handler) http.Handler { mh := &peerMembersHandler{ - clusterInfo: clusterInfo, + cluster: cluster, } mux := http.NewServeMux() @@ -43,20 +43,20 @@ func NewPeerHandler(clusterInfo etcdserver.ClusterInfo, raftHandler http.Handler } type peerMembersHandler struct { - clusterInfo etcdserver.ClusterInfo + cluster etcdserver.Cluster } func (h *peerMembersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if !allowMethod(w, r.Method, "GET") { return } - w.Header().Set("X-Etcd-Cluster-ID", h.clusterInfo.ID().String()) + w.Header().Set("X-Etcd-Cluster-ID", h.cluster.ID().String()) if r.URL.Path != peerMembersPrefix { http.Error(w, "bad path", http.StatusBadRequest) return } - ms := h.clusterInfo.Members() + ms := h.cluster.Members() w.Header().Set("Content-Type", "application/json") if err := json.NewEncoder(w).Encode(ms); err != nil { log.Printf("etcdhttp: %v", err) diff --git a/etcdserver/etcdhttp/peer_test.go b/etcdserver/etcdhttp/peer_test.go index cff85f35b..68b2f0e70 100644 --- a/etcdserver/etcdhttp/peer_test.go +++ b/etcdserver/etcdhttp/peer_test.go @@ -76,7 +76,7 @@ func TestServeMembersFails(t *testing.T) { } for i, tt := range tests { rw := httptest.NewRecorder() - h := &peerMembersHandler{clusterInfo: nil} + h := &peerMembersHandler{cluster: nil} h.ServeHTTP(rw, &http.Request{Method: tt.method}) if rw.Code != tt.wcode { t.Errorf("#%d: code=%d, want %d", i, rw.Code, tt.wcode) @@ -91,7 +91,7 @@ func TestServeMembersGet(t *testing.T) { id: 1, members: map[uint64]*etcdserver.Member{1: &memb1, 2: &memb2}, } - h := &peerMembersHandler{clusterInfo: cluster} + h := &peerMembersHandler{cluster: cluster} msb, err := json.Marshal([]etcdserver.Member{memb1, memb2}) if err != nil { t.Fatal(err) diff --git a/etcdserver/raft.go b/etcdserver/raft.go index 2b6b8960b..391a7d576 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -192,7 +192,7 @@ func (r *raftNode) resumeSending() { p.Resume() } -func startNode(cfg *ServerConfig, cl *Cluster, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) { +func startNode(cfg *ServerConfig, cl *cluster, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) { var err error member := cl.MemberByName(cfg.Name) metadata := pbutil.MustMarshal( @@ -231,7 +231,7 @@ func startNode(cfg *ServerConfig, cl *Cluster, ids []types.ID) (id types.ID, n r return } -func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *Cluster, raft.Node, *raft.MemoryStorage, *wal.WAL) { +func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *cluster, raft.Node, *raft.MemoryStorage, *wal.WAL) { var walsnap walpb.Snapshot if snapshot != nil { walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term @@ -260,7 +260,7 @@ func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *Clust return id, cl, n, s, w } -func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *Cluster, raft.Node, *raft.MemoryStorage, *wal.WAL) { +func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *cluster, raft.Node, *raft.MemoryStorage, *wal.WAL) { var walsnap walpb.Snapshot if snapshot != nil { walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term diff --git a/etcdserver/server.go b/etcdserver/server.go index e298b672b..e864b6ea5 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -154,7 +154,7 @@ type EtcdServer struct { id types.ID attributes Attributes - Cluster *Cluster + cluster *cluster store store.Store @@ -178,7 +178,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { var n raft.Node var s *raft.MemoryStorage var id types.ID - var cl *Cluster + var cl *cluster // Run the migrations. dataVer, err := version.DetectDataDir(cfg.DataDir) @@ -198,7 +198,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { if err := cfg.VerifyJoinExisting(); err != nil { return nil, err } - cl, err = NewCluster(cfg.InitialClusterToken, cfg.InitialPeerURLsMap) + cl, err = newClusterFromURLsMap(cfg.InitialClusterToken, cfg.InitialPeerURLsMap) if err != nil { return nil, err } @@ -218,7 +218,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { if err := cfg.VerifyBootstrap(); err != nil { return nil, err } - cl, err = NewCluster(cfg.InitialClusterToken, cfg.InitialPeerURLsMap) + cl, err = newClusterFromURLsMap(cfg.InitialClusterToken, cfg.InitialPeerURLsMap) if err != nil { return nil, err } @@ -238,7 +238,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { if checkDuplicateURL(urlsmap) { return nil, fmt.Errorf("discovery cluster %s has duplicate url", urlsmap) } - if cl, err = NewCluster(cfg.InitialClusterToken, urlsmap); err != nil { + if cl, err = newClusterFromURLsMap(cfg.InitialClusterToken, urlsmap); err != nil { return nil, err } } @@ -302,7 +302,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { }, id: id, attributes: Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, - Cluster: cl, + cluster: cl, stats: sstats, lstats: lstats, SyncTicker: time.Tick(500 * time.Millisecond), @@ -379,10 +379,12 @@ func (s *EtcdServer) purgeFile() { func (s *EtcdServer) ID() types.ID { return s.id } +func (s *EtcdServer) Cluster() Cluster { return s.cluster } + func (s *EtcdServer) RaftHandler() http.Handler { return s.r.transport.Handler() } func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error { - if s.Cluster.IsIDRemoved(types.ID(m.From)) { + if s.cluster.IsIDRemoved(types.ID(m.From)) { log.Printf("etcdserver: reject message from removed member %s", types.ID(m.From).String()) return httptypes.NewHTTPError(http.StatusForbidden, "cannot process message from removed member") } @@ -392,7 +394,7 @@ func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error { return s.r.Step(ctx, m) } -func (s *EtcdServer) IsIDRemoved(id uint64) bool { return s.Cluster.IsIDRemoved(types.ID(id)) } +func (s *EtcdServer) IsIDRemoved(id uint64) bool { return s.cluster.IsIDRemoved(types.ID(id)) } func (s *EtcdServer) ReportUnreachable(id uint64) { s.r.ReportUnreachable(id) } @@ -432,11 +434,11 @@ func (s *EtcdServer) run() { if err := s.store.Recovery(apply.snapshot.Data); err != nil { log.Panicf("recovery store error: %v", err) } - s.Cluster.Recover() + s.cluster.Recover() // recover raft transport s.r.transport.RemoveAllPeers() - for _, m := range s.Cluster.Members() { + for _, m := range s.cluster.Members() { if m.ID == s.ID() { continue } @@ -700,7 +702,7 @@ func (s *EtcdServer) publish(retryInterval time.Duration) { cancel() switch err { case nil: - log.Printf("etcdserver: published %+v to cluster %s", s.attributes, s.Cluster.ID()) + log.Printf("etcdserver: published %+v to cluster %s", s.attributes, s.cluster.ID()) return case ErrStopped: log.Printf("etcdserver: aborting publish because server is stopped") @@ -713,7 +715,7 @@ func (s *EtcdServer) publish(retryInterval time.Duration) { func (s *EtcdServer) send(ms []raftpb.Message) { for i, _ := range ms { - if s.Cluster.IsIDRemoved(types.ID(ms[i].To)) { + if s.cluster.IsIDRemoved(types.ID(ms[i].To)) { ms[i].To = 0 } } @@ -791,10 +793,10 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response { if err := json.Unmarshal([]byte(r.Val), &attr); err != nil { log.Panicf("unmarshal %s should never fail: %v", r.Val, err) } - s.Cluster.UpdateAttributes(id, attr) + s.cluster.UpdateAttributes(id, attr) } if r.Path == path.Join(StoreClusterPrefix, "version") { - s.Cluster.SetVersion(semver.Must(semver.NewVersion(r.Val))) + s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val))) } return f(s.store.Set(r.Path, r.Dir, r.Val, expr)) } @@ -819,7 +821,7 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response { // applyConfChange applies a ConfChange to the server. It is only // invoked with a ConfChange that has already passed through Raft func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState) (bool, error) { - if err := s.Cluster.ValidateConfigurationChange(cc); err != nil { + if err := s.cluster.ValidateConfigurationChange(cc); err != nil { cc.NodeID = raft.None s.r.ApplyConfChange(cc) return false, err @@ -834,21 +836,21 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con if cc.NodeID != uint64(m.ID) { log.Panicf("nodeID should always be equal to member ID") } - s.Cluster.AddMember(m) + s.cluster.AddMember(m) if m.ID == s.id { - log.Printf("etcdserver: added local member %s %v to cluster %s", m.ID, m.PeerURLs, s.Cluster.ID()) + log.Printf("etcdserver: added local member %s %v to cluster %s", m.ID, m.PeerURLs, s.cluster.ID()) } else { s.r.transport.AddPeer(m.ID, m.PeerURLs) - log.Printf("etcdserver: added member %s %v to cluster %s", m.ID, m.PeerURLs, s.Cluster.ID()) + log.Printf("etcdserver: added member %s %v to cluster %s", m.ID, m.PeerURLs, s.cluster.ID()) } case raftpb.ConfChangeRemoveNode: id := types.ID(cc.NodeID) - s.Cluster.RemoveMember(id) + s.cluster.RemoveMember(id) if id == s.id { return true, nil } else { s.r.transport.RemovePeer(id) - log.Printf("etcdserver: removed member %s from cluster %s", id, s.Cluster.ID()) + log.Printf("etcdserver: removed member %s from cluster %s", id, s.cluster.ID()) } case raftpb.ConfChangeUpdateNode: m := new(Member) @@ -858,12 +860,12 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con if cc.NodeID != uint64(m.ID) { log.Panicf("nodeID should always be equal to member ID") } - s.Cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes) + s.cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes) if m.ID == s.id { - log.Printf("etcdserver: update local member %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID()) + log.Printf("etcdserver: update local member %s %v in cluster %s", m.ID, m.PeerURLs, s.cluster.ID()) } else { s.r.transport.UpdatePeer(m.ID, m.PeerURLs) - log.Printf("etcdserver: update member %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID()) + log.Printf("etcdserver: update member %s %v in cluster %s", m.ID, m.PeerURLs, s.cluster.ID()) } } return false, nil @@ -917,10 +919,10 @@ func (s *EtcdServer) PauseSending() { s.r.pauseSending() } func (s *EtcdServer) ResumeSending() { s.r.resumeSending() } func (s *EtcdServer) ClusterVersion() *semver.Version { - if s.Cluster == nil { + if s.cluster == nil { return nil } - return s.Cluster.Version() + return s.cluster.Version() } // monitorVersions checks the member's version every monitorVersion interval. @@ -940,7 +942,7 @@ func (s *EtcdServer) monitorVersions() { continue } - v := decideClusterVersion(getVersions(s.Cluster, s.cfg.Transport)) + v := decideClusterVersion(getVersions(s.cluster, s.cfg.Transport)) if v != nil { // only keep major.minor version for comparasion v = &semver.Version{ @@ -952,7 +954,7 @@ func (s *EtcdServer) monitorVersions() { // if the current version is nil: // 1. use the decided version if possible // 2. or use the min cluster version - if s.Cluster.Version() == nil { + if s.cluster.Version() == nil { if v != nil { go s.updateClusterVersion(v.String()) } else { @@ -963,17 +965,17 @@ func (s *EtcdServer) monitorVersions() { // update cluster version only if the decided version is greater than // the current cluster version - if v != nil && s.Cluster.Version().LessThan(*v) { + if v != nil && s.cluster.Version().LessThan(*v) { go s.updateClusterVersion(v.String()) } } } func (s *EtcdServer) updateClusterVersion(ver string) { - if s.Cluster.Version() == nil { + if s.cluster.Version() == nil { log.Printf("etcdsever: setting up the initial cluster version to %v", ver) } else { - log.Printf("etcdsever: updating the cluster version from %v to %v", s.Cluster.Version(), ver) + log.Printf("etcdsever: updating the cluster version from %v to %v", s.cluster.Version(), ver) } req := pb.Request{ Method: "PUT", diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index a5ac90490..8a922a8d6 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -393,7 +393,7 @@ func TestApplyRequestOnAdminMemberAttributes(t *testing.T) { cl := newTestCluster([]*Member{{ID: 1}}) srv := &EtcdServer{ store: &storeRecorder{}, - Cluster: cl, + cluster: cl, } req := pb.Request{ Method: "PUT", @@ -453,7 +453,7 @@ func TestApplyConfChangeError(t *testing.T) { n := &nodeRecorder{} srv := &EtcdServer{ r: raftNode{Node: n}, - Cluster: cl, + cluster: cl, } _, err := srv.applyConfChange(tt.cc, nil) if err != tt.werr { @@ -484,7 +484,7 @@ func TestApplyConfChangeShouldStop(t *testing.T) { Node: &nodeRecorder{}, transport: &nopTransporter{}, }, - Cluster: cl, + cluster: cl, } cc := raftpb.ConfChange{ Type: raftpb.ConfChangeRemoveNode, @@ -780,7 +780,7 @@ func TestRecvSnapshot(t *testing.T) { raftStorage: raft.NewMemoryStorage(), }, store: st, - Cluster: cl, + cluster: cl, } s.start() @@ -815,7 +815,7 @@ func TestApplySnapshotAndCommittedEntries(t *testing.T) { transport: &nopTransporter{}, }, store: st, - Cluster: cl, + cluster: cl, } s.start() @@ -859,7 +859,7 @@ func TestAddMember(t *testing.T) { transport: &nopTransporter{}, }, store: st, - Cluster: cl, + cluster: cl, reqIDGen: idutil.NewGenerator(0, time.Time{}), } s.start() @@ -898,7 +898,7 @@ func TestRemoveMember(t *testing.T) { transport: &nopTransporter{}, }, store: st, - Cluster: cl, + cluster: cl, reqIDGen: idutil.NewGenerator(0, time.Time{}), } s.start() @@ -936,7 +936,7 @@ func TestUpdateMember(t *testing.T) { transport: &nopTransporter{}, }, store: st, - Cluster: cl, + cluster: cl, reqIDGen: idutil.NewGenerator(0, time.Time{}), } s.start() @@ -969,7 +969,7 @@ func TestPublish(t *testing.T) { id: 1, r: raftNode{Node: n}, attributes: Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}}, - Cluster: &Cluster{}, + cluster: &cluster{}, w: w, reqIDGen: idutil.NewGenerator(0, time.Time{}), } @@ -1010,7 +1010,7 @@ func TestPublishStopped(t *testing.T) { Node: &nodeRecorder{}, transport: &nopTransporter{}, }, - Cluster: &Cluster{}, + cluster: &cluster{}, w: &waitRecorder{}, done: make(chan struct{}), stop: make(chan struct{}), @@ -1051,7 +1051,7 @@ func TestUpdateVersion(t *testing.T) { id: 1, r: raftNode{Node: n}, attributes: Attributes{Name: "node1", ClientURLs: []string{"http://node1.com"}}, - Cluster: &Cluster{}, + cluster: &cluster{}, w: w, reqIDGen: idutil.NewGenerator(0, time.Time{}), } @@ -1137,7 +1137,7 @@ func TestGetOtherPeerURLs(t *testing.T) { }, } for i, tt := range tests { - cl := NewClusterFromMembers("", types.ID(0), tt.membs) + cl := newClusterFromMembers("", types.ID(0), tt.membs) urls := getRemotePeerURLs(cl, tt.self) if !reflect.DeepEqual(urls, tt.wurls) { t.Errorf("#%d: urls = %+v, want %+v", i, urls, tt.wurls) diff --git a/integration/cluster_test.go b/integration/cluster_test.go index 453b27407..6c678158f 100644 --- a/integration/cluster_test.go +++ b/integration/cluster_test.go @@ -695,7 +695,7 @@ func (m *member) Launch() error { m.s.SyncTicker = time.Tick(500 * time.Millisecond) m.s.Start() - m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s.Cluster, m.s.RaftHandler())} + m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s.Cluster(), m.s.RaftHandler())} for _, ln := range m.PeerListeners { hs := &httptest.Server{