etcdserver: stop exposing Cluster struct

After this PR, only cluster's interface Cluster is exposed, which makes
code much cleaner. And it avoids external packages to rely on cluster
struct in the future.
This commit is contained in:
Yicheng Qin 2015-05-12 17:22:06 -07:00
parent f4c51cb5a1
commit a6a649f1c3
13 changed files with 168 additions and 166 deletions

View File

@ -203,7 +203,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
Handler: etcdhttp.NewClientHandler(s),
Info: cfg.corsInfo,
}
ph := etcdhttp.NewPeerHandler(s.Cluster, s.RaftHandler())
ph := etcdhttp.NewPeerHandler(s.Cluster(), s.RaftHandler())
// Start the peer server in a goroutine
for _, l := range plns {
go func(l net.Listener) {

View File

@ -39,7 +39,7 @@ const (
attributesSuffix = "attributes"
)
type ClusterInfo interface {
type Cluster interface {
// ID returns the cluster ID
ID() types.ID
// ClientURLs returns an aggregate set of all URLs on which this
@ -56,7 +56,7 @@ type ClusterInfo interface {
}
// Cluster is a list of Members that belong to the same raft cluster
type Cluster struct {
type cluster struct {
id types.ID
token string
store store.Store
@ -69,9 +69,9 @@ type Cluster struct {
removed map[types.ID]bool
}
func NewCluster(token string, initial types.URLsMap) (*Cluster, error) {
func newClusterFromURLsMap(token string, urlsmap types.URLsMap) (*cluster, error) {
c := newCluster(token)
for name, urls := range initial {
for name, urls := range urlsmap {
m := NewMember(name, urls, token, nil)
if _, ok := c.members[m.ID]; ok {
return nil, fmt.Errorf("member exists with identical ID %v", m)
@ -85,7 +85,7 @@ func NewCluster(token string, initial types.URLsMap) (*Cluster, error) {
return c, nil
}
func NewClusterFromMembers(token string, id types.ID, membs []*Member) *Cluster {
func newClusterFromMembers(token string, id types.ID, membs []*Member) *cluster {
c := newCluster(token)
c.id = id
for _, m := range membs {
@ -94,17 +94,17 @@ func NewClusterFromMembers(token string, id types.ID, membs []*Member) *Cluster
return c
}
func newCluster(token string) *Cluster {
return &Cluster{
func newCluster(token string) *cluster {
return &cluster{
token: token,
members: make(map[types.ID]*Member),
removed: make(map[types.ID]bool),
}
}
func (c *Cluster) ID() types.ID { return c.id }
func (c *cluster) ID() types.ID { return c.id }
func (c *Cluster) Members() []*Member {
func (c *cluster) Members() []*Member {
c.Lock()
defer c.Unlock()
var sms SortableMemberSlice
@ -115,7 +115,7 @@ func (c *Cluster) Members() []*Member {
return []*Member(sms)
}
func (c *Cluster) Member(id types.ID) *Member {
func (c *cluster) Member(id types.ID) *Member {
c.Lock()
defer c.Unlock()
return c.members[id].Clone()
@ -123,7 +123,7 @@ func (c *Cluster) Member(id types.ID) *Member {
// MemberByName returns a Member with the given name if exists.
// If more than one member has the given name, it will panic.
func (c *Cluster) MemberByName(name string) *Member {
func (c *cluster) MemberByName(name string) *Member {
c.Lock()
defer c.Unlock()
var memb *Member
@ -138,7 +138,7 @@ func (c *Cluster) MemberByName(name string) *Member {
return memb.Clone()
}
func (c *Cluster) MemberIDs() []types.ID {
func (c *cluster) MemberIDs() []types.ID {
c.Lock()
defer c.Unlock()
var ids []types.ID
@ -149,7 +149,7 @@ func (c *Cluster) MemberIDs() []types.ID {
return ids
}
func (c *Cluster) IsIDRemoved(id types.ID) bool {
func (c *cluster) IsIDRemoved(id types.ID) bool {
c.Lock()
defer c.Unlock()
return c.removed[id]
@ -157,7 +157,7 @@ func (c *Cluster) IsIDRemoved(id types.ID) bool {
// PeerURLs returns a list of all peer addresses.
// The returned list is sorted in ascending lexicographical order.
func (c *Cluster) PeerURLs() []string {
func (c *cluster) PeerURLs() []string {
c.Lock()
defer c.Unlock()
urls := make([]string, 0)
@ -172,7 +172,7 @@ func (c *Cluster) PeerURLs() []string {
// ClientURLs returns a list of all client addresses.
// The returned list is sorted in ascending lexicographical order.
func (c *Cluster) ClientURLs() []string {
func (c *cluster) ClientURLs() []string {
c.Lock()
defer c.Unlock()
urls := make([]string, 0)
@ -185,7 +185,7 @@ func (c *Cluster) ClientURLs() []string {
return urls
}
func (c *Cluster) String() string {
func (c *cluster) String() string {
c.Lock()
defer c.Unlock()
b := &bytes.Buffer{}
@ -203,7 +203,7 @@ func (c *Cluster) String() string {
return b.String()
}
func (c *Cluster) genID() {
func (c *cluster) genID() {
mIDs := c.MemberIDs()
b := make([]byte, 8*len(mIDs))
for i, id := range mIDs {
@ -213,18 +213,18 @@ func (c *Cluster) genID() {
c.id = types.ID(binary.BigEndian.Uint64(hash[:8]))
}
func (c *Cluster) SetID(id types.ID) { c.id = id }
func (c *cluster) SetID(id types.ID) { c.id = id }
func (c *Cluster) SetStore(st store.Store) { c.store = st }
func (c *cluster) SetStore(st store.Store) { c.store = st }
func (c *Cluster) Recover() {
func (c *cluster) Recover() {
c.members, c.removed = membersFromStore(c.store)
c.version = clusterVersionFromStore(c.store)
}
// ValidateConfigurationChange takes a proposed ConfChange and
// ensures that it is still valid.
func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
func (c *cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
members, removed := membersFromStore(c.store)
id := types.ID(cc.NodeID)
if removed[id] {
@ -285,7 +285,7 @@ func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
// AddMember adds a new Member into the cluster, and saves the given member's
// raftAttributes into the store. The given member should have empty attributes.
// A Member with a matching id must not exist.
func (c *Cluster) AddMember(m *Member) {
func (c *cluster) AddMember(m *Member) {
c.Lock()
defer c.Unlock()
b, err := json.Marshal(m.RaftAttributes)
@ -301,7 +301,7 @@ func (c *Cluster) AddMember(m *Member) {
// RemoveMember removes a member from the store.
// The given id MUST exist, or the function panics.
func (c *Cluster) RemoveMember(id types.ID) {
func (c *cluster) RemoveMember(id types.ID) {
c.Lock()
defer c.Unlock()
if _, err := c.store.Delete(memberStoreKey(id), true, true); err != nil {
@ -314,14 +314,14 @@ func (c *Cluster) RemoveMember(id types.ID) {
c.removed[id] = true
}
func (c *Cluster) UpdateAttributes(id types.ID, attr Attributes) {
func (c *cluster) UpdateAttributes(id types.ID, attr Attributes) {
c.Lock()
defer c.Unlock()
c.members[id].Attributes = attr
// TODO: update store in this function
}
func (c *Cluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) {
func (c *cluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) {
c.Lock()
defer c.Unlock()
b, err := json.Marshal(raftAttr)
@ -335,7 +335,7 @@ func (c *Cluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) {
c.members[id].RaftAttributes = raftAttr
}
func (c *Cluster) Version() *semver.Version {
func (c *cluster) Version() *semver.Version {
c.Lock()
defer c.Unlock()
if c.version == nil {
@ -344,7 +344,7 @@ func (c *Cluster) Version() *semver.Version {
return semver.Must(semver.NewVersion(c.version.String()))
}
func (c *Cluster) SetVersion(ver *semver.Version) {
func (c *cluster) SetVersion(ver *semver.Version) {
c.Lock()
defer c.Unlock()
if c.version != nil {
@ -401,7 +401,7 @@ func clusterVersionFromStore(st store.Store) *semver.Version {
// with the existing cluster. If the validation succeeds, it assigns the IDs
// from the existing cluster to the local cluster.
// If the validation fails, an error will be returned.
func ValidateClusterAndAssignIDs(local *Cluster, existing *Cluster) error {
func ValidateClusterAndAssignIDs(local *cluster, existing *cluster) error {
ems := existing.Members()
lms := local.Members()
if len(ems) != len(lms) {

View File

@ -470,7 +470,7 @@ func TestClusterAddMember(t *testing.T) {
}
func TestClusterMembers(t *testing.T) {
cls := &Cluster{
cls := &cluster{
members: map[types.ID]*Member{
1: &Member{ID: 1},
20: &Member{ID: 20},
@ -521,8 +521,8 @@ func TestNodeToMember(t *testing.T) {
}
}
func newTestCluster(membs []*Member) *Cluster {
c := &Cluster{members: make(map[types.ID]*Member), removed: make(map[types.ID]bool)}
func newTestCluster(membs []*Member) *cluster {
c := &cluster{members: make(map[types.ID]*Member), removed: make(map[types.ID]bool)}
for _, m := range membs {
c.members[m.ID] = m
}

View File

@ -30,7 +30,7 @@ import (
// isMemberBootstrapped tries to check if the given member has been bootstrapped
// in the given cluster.
func isMemberBootstrapped(cl *Cluster, member string, tr *http.Transport) bool {
func isMemberBootstrapped(cl *cluster, member string, tr *http.Transport) bool {
rcl, err := getClusterFromRemotePeers(getRemotePeerURLs(cl, member), false, tr)
if err != nil {
return false
@ -51,12 +51,12 @@ func isMemberBootstrapped(cl *Cluster, member string, tr *http.Transport) bool {
// these URLs. The first URL to provide a response is used. If no URLs provide
// a response, or a Cluster cannot be successfully created from a received
// response, an error is returned.
func GetClusterFromRemotePeers(urls []string, tr *http.Transport) (*Cluster, error) {
func GetClusterFromRemotePeers(urls []string, tr *http.Transport) (*cluster, error) {
return getClusterFromRemotePeers(urls, true, tr)
}
// If logerr is true, it prints out more error messages.
func getClusterFromRemotePeers(urls []string, logerr bool, tr *http.Transport) (*Cluster, error) {
func getClusterFromRemotePeers(urls []string, logerr bool, tr *http.Transport) (*cluster, error) {
cc := &http.Client{
Transport: tr,
Timeout: time.Second,
@ -90,14 +90,14 @@ func getClusterFromRemotePeers(urls []string, logerr bool, tr *http.Transport) (
}
continue
}
return NewClusterFromMembers("", id, membs), nil
return newClusterFromMembers("", id, membs), nil
}
return nil, fmt.Errorf("etcdserver: could not retrieve cluster information from the given urls")
}
// getRemotePeerURLs returns peer urls of remote members in the cluster. The
// returned list is sorted in ascending lexicographical order.
func getRemotePeerURLs(cl ClusterInfo, local string) []string {
func getRemotePeerURLs(cl Cluster, local string) []string {
us := make([]string, 0)
for _, m := range cl.Members() {
if m.Name == local {
@ -113,7 +113,7 @@ func getRemotePeerURLs(cl ClusterInfo, local string) []string {
// The key of the returned map is the member's ID. The value of the returned map
// is the semver version string. If it fails to get the version of a member, the key
// will be an empty string.
func getVersions(cl ClusterInfo, tr *http.Transport) map[string]string {
func getVersions(cl Cluster, tr *http.Transport) map[string]string {
members := cl.Members()
vers := make(map[string]string)
for _, m := range members {

View File

@ -60,11 +60,11 @@ func NewClientHandler(server *etcdserver.EtcdServer) http.Handler {
sec := security.NewStore(server, defaultServerTimeout)
kh := &keysHandler{
sec: sec,
server: server,
clusterInfo: server.Cluster,
timer: server,
timeout: defaultServerTimeout,
sec: sec,
server: server,
cluster: server.Cluster(),
timer: server,
timeout: defaultServerTimeout,
}
sh := &statsHandler{
@ -72,19 +72,19 @@ func NewClientHandler(server *etcdserver.EtcdServer) http.Handler {
}
mh := &membersHandler{
sec: sec,
server: server,
clusterInfo: server.Cluster,
clock: clockwork.NewRealClock(),
sec: sec,
server: server,
cluster: server.Cluster(),
clock: clockwork.NewRealClock(),
}
dmh := &deprecatedMachinesHandler{
clusterInfo: server.Cluster,
cluster: server.Cluster(),
}
sech := &securityHandler{
sec: sec,
clusterInfo: server.Cluster,
sec: sec,
cluster: server.Cluster(),
}
mux := http.NewServeMux()
@ -106,11 +106,11 @@ func NewClientHandler(server *etcdserver.EtcdServer) http.Handler {
}
type keysHandler struct {
sec *security.Store
server etcdserver.Server
clusterInfo etcdserver.ClusterInfo
timer etcdserver.RaftTimer
timeout time.Duration
sec *security.Store
server etcdserver.Server
cluster etcdserver.Cluster
timer etcdserver.RaftTimer
timeout time.Duration
}
func (h *keysHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@ -118,7 +118,7 @@ func (h *keysHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
w.Header().Set("X-Etcd-Cluster-ID", h.clusterInfo.ID().String())
w.Header().Set("X-Etcd-Cluster-ID", h.cluster.ID().String())
ctx, cancel := context.WithTimeout(context.Background(), h.timeout)
defer cancel()
@ -156,22 +156,22 @@ func (h *keysHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
type deprecatedMachinesHandler struct {
clusterInfo etcdserver.ClusterInfo
cluster etcdserver.Cluster
}
func (h *deprecatedMachinesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if !allowMethod(w, r.Method, "GET", "HEAD") {
return
}
endpoints := h.clusterInfo.ClientURLs()
endpoints := h.cluster.ClientURLs()
w.Write([]byte(strings.Join(endpoints, ", ")))
}
type membersHandler struct {
sec *security.Store
server etcdserver.Server
clusterInfo etcdserver.ClusterInfo
clock clockwork.Clock
sec *security.Store
server etcdserver.Server
cluster etcdserver.Cluster
clock clockwork.Clock
}
func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@ -182,7 +182,7 @@ func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
writeNoAuth(w)
return
}
w.Header().Set("X-Etcd-Cluster-ID", h.clusterInfo.ID().String())
w.Header().Set("X-Etcd-Cluster-ID", h.cluster.ID().String())
ctx, cancel := context.WithTimeout(context.Background(), defaultServerTimeout)
defer cancel()
@ -191,7 +191,7 @@ func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
case "GET":
switch trimPrefix(r.URL.Path, membersPrefix) {
case "":
mc := newMemberCollection(h.clusterInfo.Members())
mc := newMemberCollection(h.cluster.Members())
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(mc); err != nil {
log.Printf("etcdhttp: %v", err)
@ -202,7 +202,7 @@ func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
writeError(w, httptypes.NewHTTPError(http.StatusServiceUnavailable, "During election"))
return
}
m := newMember(h.clusterInfo.Member(id))
m := newMember(h.cluster.Member(id))
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(m); err != nil {
log.Printf("etcdhttp: %v", err)

View File

@ -28,8 +28,8 @@ import (
)
type securityHandler struct {
sec *security.Store
clusterInfo etcdserver.ClusterInfo
sec *security.Store
cluster etcdserver.Cluster
}
func hasWriteRootAccess(sec *security.Store, r *http.Request) bool {
@ -140,7 +140,7 @@ func (sh *securityHandler) baseRoles(w http.ResponseWriter, r *http.Request) {
writeNoAuth(w)
return
}
w.Header().Set("X-Etcd-Cluster-ID", sh.clusterInfo.ID().String())
w.Header().Set("X-Etcd-Cluster-ID", sh.cluster.ID().String())
w.Header().Set("Content-Type", "application/json")
var rolesCollections struct {
Roles []string `json:"roles"`
@ -185,7 +185,7 @@ func (sh *securityHandler) forRole(w http.ResponseWriter, r *http.Request, role
writeNoAuth(w)
return
}
w.Header().Set("X-Etcd-Cluster-ID", sh.clusterInfo.ID().String())
w.Header().Set("X-Etcd-Cluster-ID", sh.cluster.ID().String())
switch r.Method {
case "GET":
@ -245,7 +245,7 @@ func (sh *securityHandler) baseUsers(w http.ResponseWriter, r *http.Request) {
writeNoAuth(w)
return
}
w.Header().Set("X-Etcd-Cluster-ID", sh.clusterInfo.ID().String())
w.Header().Set("X-Etcd-Cluster-ID", sh.cluster.ID().String())
w.Header().Set("Content-Type", "application/json")
var usersCollections struct {
Users []string `json:"users"`
@ -290,7 +290,7 @@ func (sh *securityHandler) forUser(w http.ResponseWriter, r *http.Request, user
writeNoAuth(w)
return
}
w.Header().Set("X-Etcd-Cluster-ID", sh.clusterInfo.ID().String())
w.Header().Set("X-Etcd-Cluster-ID", sh.cluster.ID().String())
switch r.Method {
case "GET":
@ -360,7 +360,7 @@ func (sh *securityHandler) enableDisable(w http.ResponseWriter, r *http.Request)
writeNoAuth(w)
return
}
w.Header().Set("X-Etcd-Cluster-ID", sh.clusterInfo.ID().String())
w.Header().Set("X-Etcd-Cluster-ID", sh.cluster.ID().String())
w.Header().Set("Content-Type", "application/json")
isEnabled := sh.sec.SecurityEnabled()
switch r.Method {

View File

@ -564,9 +564,9 @@ func TestServeMembers(t *testing.T) {
members: map[uint64]*etcdserver.Member{1: &memb1, 2: &memb2},
}
h := &membersHandler{
server: &serverRecorder{},
clock: clockwork.NewFakeClock(),
clusterInfo: cluster,
server: &serverRecorder{},
clock: clockwork.NewFakeClock(),
cluster: cluster,
}
wmc := string(`{"members":[{"id":"c","name":"","peerURLs":[],"clientURLs":["http://localhost:8080"]},{"id":"d","name":"","peerURLs":[],"clientURLs":["http://localhost:8081"]}]}`)
@ -617,9 +617,9 @@ func TestServeLeader(t *testing.T) {
members: map[uint64]*etcdserver.Member{1: &memb1, 2: &memb2},
}
h := &membersHandler{
server: &serverRecorder{},
clock: clockwork.NewFakeClock(),
clusterInfo: cluster,
server: &serverRecorder{},
clock: clockwork.NewFakeClock(),
cluster: cluster,
}
wmc := string(`{"id":"1","name":"","peerURLs":[],"clientURLs":["http://localhost:8080"]}`)
@ -669,9 +669,9 @@ func TestServeMembersCreate(t *testing.T) {
req.Header.Set("Content-Type", "application/json")
s := &serverRecorder{}
h := &membersHandler{
server: s,
clock: clockwork.NewFakeClock(),
clusterInfo: &fakeCluster{id: 1},
server: s,
clock: clockwork.NewFakeClock(),
cluster: &fakeCluster{id: 1},
}
rw := httptest.NewRecorder()
@ -687,7 +687,7 @@ func TestServeMembersCreate(t *testing.T) {
t.Errorf("content-type = %s, want %s", gct, wct)
}
gcid := rw.Header().Get("X-Etcd-Cluster-ID")
wcid := h.clusterInfo.ID().String()
wcid := h.cluster.ID().String()
if gcid != wcid {
t.Errorf("cid = %s, want %s", gcid, wcid)
}
@ -718,8 +718,8 @@ func TestServeMembersDelete(t *testing.T) {
}
s := &serverRecorder{}
h := &membersHandler{
server: s,
clusterInfo: &fakeCluster{id: 1},
server: s,
cluster: &fakeCluster{id: 1},
}
rw := httptest.NewRecorder()
@ -730,7 +730,7 @@ func TestServeMembersDelete(t *testing.T) {
t.Errorf("code=%d, want %d", rw.Code, wcode)
}
gcid := rw.Header().Get("X-Etcd-Cluster-ID")
wcid := h.clusterInfo.ID().String()
wcid := h.cluster.ID().String()
if gcid != wcid {
t.Errorf("cid = %s, want %s", gcid, wcid)
}
@ -754,9 +754,9 @@ func TestServeMembersUpdate(t *testing.T) {
req.Header.Set("Content-Type", "application/json")
s := &serverRecorder{}
h := &membersHandler{
server: s,
clock: clockwork.NewFakeClock(),
clusterInfo: &fakeCluster{id: 1},
server: s,
clock: clockwork.NewFakeClock(),
cluster: &fakeCluster{id: 1},
}
rw := httptest.NewRecorder()
@ -768,7 +768,7 @@ func TestServeMembersUpdate(t *testing.T) {
}
gcid := rw.Header().Get("X-Etcd-Cluster-ID")
wcid := h.clusterInfo.ID().String()
wcid := h.cluster.ID().String()
if gcid != wcid {
t.Errorf("cid = %s, want %s", gcid, wcid)
}
@ -1046,9 +1046,9 @@ func TestServeMembersFail(t *testing.T) {
}
for i, tt := range tests {
h := &membersHandler{
server: tt.server,
clusterInfo: &fakeCluster{id: 1},
clock: clockwork.NewFakeClock(),
server: tt.server,
cluster: &fakeCluster{id: 1},
clock: clockwork.NewFakeClock(),
}
rw := httptest.NewRecorder()
h.ServeHTTP(rw, tt.req)
@ -1057,7 +1057,7 @@ func TestServeMembersFail(t *testing.T) {
}
if rw.Code != http.StatusMethodNotAllowed {
gcid := rw.Header().Get("X-Etcd-Cluster-ID")
wcid := h.clusterInfo.ID().String()
wcid := h.cluster.ID().String()
if gcid != wcid {
t.Errorf("#%d: cid = %s, want %s", i, gcid, wcid)
}
@ -1141,7 +1141,7 @@ func TestV2DeprecatedMachinesEndpoint(t *testing.T) {
{"POST", http.StatusMethodNotAllowed},
}
m := &deprecatedMachinesHandler{clusterInfo: &etcdserver.Cluster{}}
m := &deprecatedMachinesHandler{cluster: &fakeCluster{}}
s := httptest.NewServer(m)
defer s.Close()
@ -1170,7 +1170,7 @@ func TestServeMachines(t *testing.T) {
if err != nil {
t.Fatal(err)
}
h := &deprecatedMachinesHandler{clusterInfo: cluster}
h := &deprecatedMachinesHandler{cluster: cluster}
h.ServeHTTP(writer, req)
w := "http://localhost:8080, http://localhost:8081, http://localhost:8082"
if g := writer.Body.String(); g != w {
@ -1424,9 +1424,9 @@ func TestBadServeKeys(t *testing.T) {
}
for i, tt := range testBadCases {
h := &keysHandler{
timeout: 0, // context times out immediately
server: tt.server,
clusterInfo: &fakeCluster{id: 1},
timeout: 0, // context times out immediately
server: tt.server,
cluster: &fakeCluster{id: 1},
}
rw := httptest.NewRecorder()
h.ServeHTTP(rw, tt.req)
@ -1435,7 +1435,7 @@ func TestBadServeKeys(t *testing.T) {
}
if rw.Code != http.StatusMethodNotAllowed {
gcid := rw.Header().Get("X-Etcd-Cluster-ID")
wcid := h.clusterInfo.ID().String()
wcid := h.cluster.ID().String()
if gcid != wcid {
t.Errorf("#%d: cid = %s, want %s", i, gcid, wcid)
}
@ -1482,10 +1482,10 @@ func TestServeKeysGood(t *testing.T) {
}
for i, tt := range tests {
h := &keysHandler{
timeout: time.Hour,
server: server,
timer: &dummyRaftTimer{},
clusterInfo: &fakeCluster{id: 1},
timeout: time.Hour,
server: server,
timer: &dummyRaftTimer{},
cluster: &fakeCluster{id: 1},
}
rw := httptest.NewRecorder()
h.ServeHTTP(rw, tt.req)
@ -1506,10 +1506,10 @@ func TestServeKeysEvent(t *testing.T) {
},
}
h := &keysHandler{
timeout: time.Hour,
server: server,
clusterInfo: &fakeCluster{id: 1},
timer: &dummyRaftTimer{},
timeout: time.Hour,
server: server,
cluster: &fakeCluster{id: 1},
timer: &dummyRaftTimer{},
}
rw := httptest.NewRecorder()
@ -1528,7 +1528,7 @@ func TestServeKeysEvent(t *testing.T) {
t.Errorf("got code=%d, want %d", rw.Code, wcode)
}
gcid := rw.Header().Get("X-Etcd-Cluster-ID")
wcid := h.clusterInfo.ID().String()
wcid := h.cluster.ID().String()
if gcid != wcid {
t.Errorf("cid = %s, want %s", gcid, wcid)
}
@ -1550,10 +1550,10 @@ func TestServeKeysWatch(t *testing.T) {
},
}
h := &keysHandler{
timeout: time.Hour,
server: server,
clusterInfo: &fakeCluster{id: 1},
timer: &dummyRaftTimer{},
timeout: time.Hour,
server: server,
cluster: &fakeCluster{id: 1},
timer: &dummyRaftTimer{},
}
go func() {
ec <- &store.Event{
@ -1578,7 +1578,7 @@ func TestServeKeysWatch(t *testing.T) {
t.Errorf("got code=%d, want %d", rw.Code, wcode)
}
gcid := rw.Header().Get("X-Etcd-Cluster-ID")
wcid := h.clusterInfo.ID().String()
wcid := h.cluster.ID().String()
if gcid != wcid {
t.Errorf("cid = %s, want %s", gcid, wcid)
}

View File

@ -28,9 +28,9 @@ const (
)
// NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests.
func NewPeerHandler(clusterInfo etcdserver.ClusterInfo, raftHandler http.Handler) http.Handler {
func NewPeerHandler(cluster etcdserver.Cluster, raftHandler http.Handler) http.Handler {
mh := &peerMembersHandler{
clusterInfo: clusterInfo,
cluster: cluster,
}
mux := http.NewServeMux()
@ -43,20 +43,20 @@ func NewPeerHandler(clusterInfo etcdserver.ClusterInfo, raftHandler http.Handler
}
type peerMembersHandler struct {
clusterInfo etcdserver.ClusterInfo
cluster etcdserver.Cluster
}
func (h *peerMembersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if !allowMethod(w, r.Method, "GET") {
return
}
w.Header().Set("X-Etcd-Cluster-ID", h.clusterInfo.ID().String())
w.Header().Set("X-Etcd-Cluster-ID", h.cluster.ID().String())
if r.URL.Path != peerMembersPrefix {
http.Error(w, "bad path", http.StatusBadRequest)
return
}
ms := h.clusterInfo.Members()
ms := h.cluster.Members()
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(ms); err != nil {
log.Printf("etcdhttp: %v", err)

View File

@ -76,7 +76,7 @@ func TestServeMembersFails(t *testing.T) {
}
for i, tt := range tests {
rw := httptest.NewRecorder()
h := &peerMembersHandler{clusterInfo: nil}
h := &peerMembersHandler{cluster: nil}
h.ServeHTTP(rw, &http.Request{Method: tt.method})
if rw.Code != tt.wcode {
t.Errorf("#%d: code=%d, want %d", i, rw.Code, tt.wcode)
@ -91,7 +91,7 @@ func TestServeMembersGet(t *testing.T) {
id: 1,
members: map[uint64]*etcdserver.Member{1: &memb1, 2: &memb2},
}
h := &peerMembersHandler{clusterInfo: cluster}
h := &peerMembersHandler{cluster: cluster}
msb, err := json.Marshal([]etcdserver.Member{memb1, memb2})
if err != nil {
t.Fatal(err)

View File

@ -192,7 +192,7 @@ func (r *raftNode) resumeSending() {
p.Resume()
}
func startNode(cfg *ServerConfig, cl *Cluster, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) {
func startNode(cfg *ServerConfig, cl *cluster, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) {
var err error
member := cl.MemberByName(cfg.Name)
metadata := pbutil.MustMarshal(
@ -231,7 +231,7 @@ func startNode(cfg *ServerConfig, cl *Cluster, ids []types.ID) (id types.ID, n r
return
}
func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *Cluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *cluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
var walsnap walpb.Snapshot
if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
@ -260,7 +260,7 @@ func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *Clust
return id, cl, n, s, w
}
func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *Cluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *cluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
var walsnap walpb.Snapshot
if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term

View File

@ -154,7 +154,7 @@ type EtcdServer struct {
id types.ID
attributes Attributes
Cluster *Cluster
cluster *cluster
store store.Store
@ -178,7 +178,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
var n raft.Node
var s *raft.MemoryStorage
var id types.ID
var cl *Cluster
var cl *cluster
// Run the migrations.
dataVer, err := version.DetectDataDir(cfg.DataDir)
@ -198,7 +198,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
if err := cfg.VerifyJoinExisting(); err != nil {
return nil, err
}
cl, err = NewCluster(cfg.InitialClusterToken, cfg.InitialPeerURLsMap)
cl, err = newClusterFromURLsMap(cfg.InitialClusterToken, cfg.InitialPeerURLsMap)
if err != nil {
return nil, err
}
@ -218,7 +218,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
if err := cfg.VerifyBootstrap(); err != nil {
return nil, err
}
cl, err = NewCluster(cfg.InitialClusterToken, cfg.InitialPeerURLsMap)
cl, err = newClusterFromURLsMap(cfg.InitialClusterToken, cfg.InitialPeerURLsMap)
if err != nil {
return nil, err
}
@ -238,7 +238,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
if checkDuplicateURL(urlsmap) {
return nil, fmt.Errorf("discovery cluster %s has duplicate url", urlsmap)
}
if cl, err = NewCluster(cfg.InitialClusterToken, urlsmap); err != nil {
if cl, err = newClusterFromURLsMap(cfg.InitialClusterToken, urlsmap); err != nil {
return nil, err
}
}
@ -302,7 +302,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
},
id: id,
attributes: Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
Cluster: cl,
cluster: cl,
stats: sstats,
lstats: lstats,
SyncTicker: time.Tick(500 * time.Millisecond),
@ -379,10 +379,12 @@ func (s *EtcdServer) purgeFile() {
func (s *EtcdServer) ID() types.ID { return s.id }
func (s *EtcdServer) Cluster() Cluster { return s.cluster }
func (s *EtcdServer) RaftHandler() http.Handler { return s.r.transport.Handler() }
func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
if s.Cluster.IsIDRemoved(types.ID(m.From)) {
if s.cluster.IsIDRemoved(types.ID(m.From)) {
log.Printf("etcdserver: reject message from removed member %s", types.ID(m.From).String())
return httptypes.NewHTTPError(http.StatusForbidden, "cannot process message from removed member")
}
@ -392,7 +394,7 @@ func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
return s.r.Step(ctx, m)
}
func (s *EtcdServer) IsIDRemoved(id uint64) bool { return s.Cluster.IsIDRemoved(types.ID(id)) }
func (s *EtcdServer) IsIDRemoved(id uint64) bool { return s.cluster.IsIDRemoved(types.ID(id)) }
func (s *EtcdServer) ReportUnreachable(id uint64) { s.r.ReportUnreachable(id) }
@ -432,11 +434,11 @@ func (s *EtcdServer) run() {
if err := s.store.Recovery(apply.snapshot.Data); err != nil {
log.Panicf("recovery store error: %v", err)
}
s.Cluster.Recover()
s.cluster.Recover()
// recover raft transport
s.r.transport.RemoveAllPeers()
for _, m := range s.Cluster.Members() {
for _, m := range s.cluster.Members() {
if m.ID == s.ID() {
continue
}
@ -700,7 +702,7 @@ func (s *EtcdServer) publish(retryInterval time.Duration) {
cancel()
switch err {
case nil:
log.Printf("etcdserver: published %+v to cluster %s", s.attributes, s.Cluster.ID())
log.Printf("etcdserver: published %+v to cluster %s", s.attributes, s.cluster.ID())
return
case ErrStopped:
log.Printf("etcdserver: aborting publish because server is stopped")
@ -713,7 +715,7 @@ func (s *EtcdServer) publish(retryInterval time.Duration) {
func (s *EtcdServer) send(ms []raftpb.Message) {
for i, _ := range ms {
if s.Cluster.IsIDRemoved(types.ID(ms[i].To)) {
if s.cluster.IsIDRemoved(types.ID(ms[i].To)) {
ms[i].To = 0
}
}
@ -791,10 +793,10 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response {
if err := json.Unmarshal([]byte(r.Val), &attr); err != nil {
log.Panicf("unmarshal %s should never fail: %v", r.Val, err)
}
s.Cluster.UpdateAttributes(id, attr)
s.cluster.UpdateAttributes(id, attr)
}
if r.Path == path.Join(StoreClusterPrefix, "version") {
s.Cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)))
s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)))
}
return f(s.store.Set(r.Path, r.Dir, r.Val, expr))
}
@ -819,7 +821,7 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response {
// applyConfChange applies a ConfChange to the server. It is only
// invoked with a ConfChange that has already passed through Raft
func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState) (bool, error) {
if err := s.Cluster.ValidateConfigurationChange(cc); err != nil {
if err := s.cluster.ValidateConfigurationChange(cc); err != nil {
cc.NodeID = raft.None
s.r.ApplyConfChange(cc)
return false, err
@ -834,21 +836,21 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
if cc.NodeID != uint64(m.ID) {
log.Panicf("nodeID should always be equal to member ID")
}
s.Cluster.AddMember(m)
s.cluster.AddMember(m)
if m.ID == s.id {
log.Printf("etcdserver: added local member %s %v to cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
log.Printf("etcdserver: added local member %s %v to cluster %s", m.ID, m.PeerURLs, s.cluster.ID())
} else {
s.r.transport.AddPeer(m.ID, m.PeerURLs)
log.Printf("etcdserver: added member %s %v to cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
log.Printf("etcdserver: added member %s %v to cluster %s", m.ID, m.PeerURLs, s.cluster.ID())
}
case raftpb.ConfChangeRemoveNode:
id := types.ID(cc.NodeID)
s.Cluster.RemoveMember(id)
s.cluster.RemoveMember(id)
if id == s.id {
return true, nil
} else {
s.r.transport.RemovePeer(id)
log.Printf("etcdserver: removed member %s from cluster %s", id, s.Cluster.ID())
log.Printf("etcdserver: removed member %s from cluster %s", id, s.cluster.ID())
}
case raftpb.ConfChangeUpdateNode:
m := new(Member)
@ -858,12 +860,12 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
if cc.NodeID != uint64(m.ID) {
log.Panicf("nodeID should always be equal to member ID")
}
s.Cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes)
s.cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes)
if m.ID == s.id {
log.Printf("etcdserver: update local member %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
log.Printf("etcdserver: update local member %s %v in cluster %s", m.ID, m.PeerURLs, s.cluster.ID())
} else {
s.r.transport.UpdatePeer(m.ID, m.PeerURLs)
log.Printf("etcdserver: update member %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
log.Printf("etcdserver: update member %s %v in cluster %s", m.ID, m.PeerURLs, s.cluster.ID())
}
}
return false, nil
@ -917,10 +919,10 @@ func (s *EtcdServer) PauseSending() { s.r.pauseSending() }
func (s *EtcdServer) ResumeSending() { s.r.resumeSending() }
func (s *EtcdServer) ClusterVersion() *semver.Version {
if s.Cluster == nil {
if s.cluster == nil {
return nil
}
return s.Cluster.Version()
return s.cluster.Version()
}
// monitorVersions checks the member's version every monitorVersion interval.
@ -940,7 +942,7 @@ func (s *EtcdServer) monitorVersions() {
continue
}
v := decideClusterVersion(getVersions(s.Cluster, s.cfg.Transport))
v := decideClusterVersion(getVersions(s.cluster, s.cfg.Transport))
if v != nil {
// only keep major.minor version for comparasion
v = &semver.Version{
@ -952,7 +954,7 @@ func (s *EtcdServer) monitorVersions() {
// if the current version is nil:
// 1. use the decided version if possible
// 2. or use the min cluster version
if s.Cluster.Version() == nil {
if s.cluster.Version() == nil {
if v != nil {
go s.updateClusterVersion(v.String())
} else {
@ -963,17 +965,17 @@ func (s *EtcdServer) monitorVersions() {
// update cluster version only if the decided version is greater than
// the current cluster version
if v != nil && s.Cluster.Version().LessThan(*v) {
if v != nil && s.cluster.Version().LessThan(*v) {
go s.updateClusterVersion(v.String())
}
}
}
func (s *EtcdServer) updateClusterVersion(ver string) {
if s.Cluster.Version() == nil {
if s.cluster.Version() == nil {
log.Printf("etcdsever: setting up the initial cluster version to %v", ver)
} else {
log.Printf("etcdsever: updating the cluster version from %v to %v", s.Cluster.Version(), ver)
log.Printf("etcdsever: updating the cluster version from %v to %v", s.cluster.Version(), ver)
}
req := pb.Request{
Method: "PUT",

View File

@ -393,7 +393,7 @@ func TestApplyRequestOnAdminMemberAttributes(t *testing.T) {
cl := newTestCluster([]*Member{{ID: 1}})
srv := &EtcdServer{
store: &storeRecorder{},
Cluster: cl,
cluster: cl,
}
req := pb.Request{
Method: "PUT",
@ -453,7 +453,7 @@ func TestApplyConfChangeError(t *testing.T) {
n := &nodeRecorder{}
srv := &EtcdServer{
r: raftNode{Node: n},
Cluster: cl,
cluster: cl,
}
_, err := srv.applyConfChange(tt.cc, nil)
if err != tt.werr {
@ -484,7 +484,7 @@ func TestApplyConfChangeShouldStop(t *testing.T) {
Node: &nodeRecorder{},
transport: &nopTransporter{},
},
Cluster: cl,
cluster: cl,
}
cc := raftpb.ConfChange{
Type: raftpb.ConfChangeRemoveNode,
@ -780,7 +780,7 @@ func TestRecvSnapshot(t *testing.T) {
raftStorage: raft.NewMemoryStorage(),
},
store: st,
Cluster: cl,
cluster: cl,
}
s.start()
@ -815,7 +815,7 @@ func TestApplySnapshotAndCommittedEntries(t *testing.T) {
transport: &nopTransporter{},
},
store: st,
Cluster: cl,
cluster: cl,
}
s.start()
@ -859,7 +859,7 @@ func TestAddMember(t *testing.T) {
transport: &nopTransporter{},
},
store: st,
Cluster: cl,
cluster: cl,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
}
s.start()
@ -898,7 +898,7 @@ func TestRemoveMember(t *testing.T) {
transport: &nopTransporter{},
},
store: st,
Cluster: cl,
cluster: cl,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
}
s.start()
@ -936,7 +936,7 @@ func TestUpdateMember(t *testing.T) {
transport: &nopTransporter{},
},
store: st,
Cluster: cl,
cluster: cl,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
}
s.start()
@ -969,7 +969,7 @@ func TestPublish(t *testing.T) {
id: 1,
r: raftNode{Node: n},
attributes: Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}},
Cluster: &Cluster{},
cluster: &cluster{},
w: w,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
}
@ -1010,7 +1010,7 @@ func TestPublishStopped(t *testing.T) {
Node: &nodeRecorder{},
transport: &nopTransporter{},
},
Cluster: &Cluster{},
cluster: &cluster{},
w: &waitRecorder{},
done: make(chan struct{}),
stop: make(chan struct{}),
@ -1051,7 +1051,7 @@ func TestUpdateVersion(t *testing.T) {
id: 1,
r: raftNode{Node: n},
attributes: Attributes{Name: "node1", ClientURLs: []string{"http://node1.com"}},
Cluster: &Cluster{},
cluster: &cluster{},
w: w,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
}
@ -1137,7 +1137,7 @@ func TestGetOtherPeerURLs(t *testing.T) {
},
}
for i, tt := range tests {
cl := NewClusterFromMembers("", types.ID(0), tt.membs)
cl := newClusterFromMembers("", types.ID(0), tt.membs)
urls := getRemotePeerURLs(cl, tt.self)
if !reflect.DeepEqual(urls, tt.wurls) {
t.Errorf("#%d: urls = %+v, want %+v", i, urls, tt.wurls)

View File

@ -695,7 +695,7 @@ func (m *member) Launch() error {
m.s.SyncTicker = time.Tick(500 * time.Millisecond)
m.s.Start()
m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s.Cluster, m.s.RaftHandler())}
m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s.Cluster(), m.s.RaftHandler())}
for _, ln := range m.PeerListeners {
hs := &httptest.Server{