Merge pull request #2821 from yichengq/private-cluster

etcdserver: stop exposing Cluster struct
This commit is contained in:
Yicheng Qin 2015-05-13 10:26:48 -07:00
commit 75ee7f4aa1
13 changed files with 168 additions and 166 deletions

View File

@ -203,7 +203,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
Handler: etcdhttp.NewClientHandler(s),
Info: cfg.corsInfo,
}
ph := etcdhttp.NewPeerHandler(s.Cluster, s.RaftHandler())
ph := etcdhttp.NewPeerHandler(s.Cluster(), s.RaftHandler())
// Start the peer server in a goroutine
for _, l := range plns {
go func(l net.Listener) {

View File

@ -39,7 +39,7 @@ const (
attributesSuffix = "attributes"
)
type ClusterInfo interface {
type Cluster interface {
// ID returns the cluster ID
ID() types.ID
// ClientURLs returns an aggregate set of all URLs on which this
@ -56,7 +56,7 @@ type ClusterInfo interface {
}
// Cluster is a list of Members that belong to the same raft cluster
type Cluster struct {
type cluster struct {
id types.ID
token string
store store.Store
@ -69,9 +69,9 @@ type Cluster struct {
removed map[types.ID]bool
}
func NewCluster(token string, initial types.URLsMap) (*Cluster, error) {
func newClusterFromURLsMap(token string, urlsmap types.URLsMap) (*cluster, error) {
c := newCluster(token)
for name, urls := range initial {
for name, urls := range urlsmap {
m := NewMember(name, urls, token, nil)
if _, ok := c.members[m.ID]; ok {
return nil, fmt.Errorf("member exists with identical ID %v", m)
@ -85,7 +85,7 @@ func NewCluster(token string, initial types.URLsMap) (*Cluster, error) {
return c, nil
}
func NewClusterFromMembers(token string, id types.ID, membs []*Member) *Cluster {
func newClusterFromMembers(token string, id types.ID, membs []*Member) *cluster {
c := newCluster(token)
c.id = id
for _, m := range membs {
@ -94,17 +94,17 @@ func NewClusterFromMembers(token string, id types.ID, membs []*Member) *Cluster
return c
}
func newCluster(token string) *Cluster {
return &Cluster{
func newCluster(token string) *cluster {
return &cluster{
token: token,
members: make(map[types.ID]*Member),
removed: make(map[types.ID]bool),
}
}
func (c *Cluster) ID() types.ID { return c.id }
func (c *cluster) ID() types.ID { return c.id }
func (c *Cluster) Members() []*Member {
func (c *cluster) Members() []*Member {
c.Lock()
defer c.Unlock()
var sms SortableMemberSlice
@ -115,7 +115,7 @@ func (c *Cluster) Members() []*Member {
return []*Member(sms)
}
func (c *Cluster) Member(id types.ID) *Member {
func (c *cluster) Member(id types.ID) *Member {
c.Lock()
defer c.Unlock()
return c.members[id].Clone()
@ -123,7 +123,7 @@ func (c *Cluster) Member(id types.ID) *Member {
// MemberByName returns a Member with the given name if exists.
// If more than one member has the given name, it will panic.
func (c *Cluster) MemberByName(name string) *Member {
func (c *cluster) MemberByName(name string) *Member {
c.Lock()
defer c.Unlock()
var memb *Member
@ -138,7 +138,7 @@ func (c *Cluster) MemberByName(name string) *Member {
return memb.Clone()
}
func (c *Cluster) MemberIDs() []types.ID {
func (c *cluster) MemberIDs() []types.ID {
c.Lock()
defer c.Unlock()
var ids []types.ID
@ -149,7 +149,7 @@ func (c *Cluster) MemberIDs() []types.ID {
return ids
}
func (c *Cluster) IsIDRemoved(id types.ID) bool {
func (c *cluster) IsIDRemoved(id types.ID) bool {
c.Lock()
defer c.Unlock()
return c.removed[id]
@ -157,7 +157,7 @@ func (c *Cluster) IsIDRemoved(id types.ID) bool {
// PeerURLs returns a list of all peer addresses.
// The returned list is sorted in ascending lexicographical order.
func (c *Cluster) PeerURLs() []string {
func (c *cluster) PeerURLs() []string {
c.Lock()
defer c.Unlock()
urls := make([]string, 0)
@ -172,7 +172,7 @@ func (c *Cluster) PeerURLs() []string {
// ClientURLs returns a list of all client addresses.
// The returned list is sorted in ascending lexicographical order.
func (c *Cluster) ClientURLs() []string {
func (c *cluster) ClientURLs() []string {
c.Lock()
defer c.Unlock()
urls := make([]string, 0)
@ -185,7 +185,7 @@ func (c *Cluster) ClientURLs() []string {
return urls
}
func (c *Cluster) String() string {
func (c *cluster) String() string {
c.Lock()
defer c.Unlock()
b := &bytes.Buffer{}
@ -203,7 +203,7 @@ func (c *Cluster) String() string {
return b.String()
}
func (c *Cluster) genID() {
func (c *cluster) genID() {
mIDs := c.MemberIDs()
b := make([]byte, 8*len(mIDs))
for i, id := range mIDs {
@ -213,18 +213,18 @@ func (c *Cluster) genID() {
c.id = types.ID(binary.BigEndian.Uint64(hash[:8]))
}
func (c *Cluster) SetID(id types.ID) { c.id = id }
func (c *cluster) SetID(id types.ID) { c.id = id }
func (c *Cluster) SetStore(st store.Store) { c.store = st }
func (c *cluster) SetStore(st store.Store) { c.store = st }
func (c *Cluster) Recover() {
func (c *cluster) Recover() {
c.members, c.removed = membersFromStore(c.store)
c.version = clusterVersionFromStore(c.store)
}
// ValidateConfigurationChange takes a proposed ConfChange and
// ensures that it is still valid.
func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
func (c *cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
members, removed := membersFromStore(c.store)
id := types.ID(cc.NodeID)
if removed[id] {
@ -285,7 +285,7 @@ func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
// AddMember adds a new Member into the cluster, and saves the given member's
// raftAttributes into the store. The given member should have empty attributes.
// A Member with a matching id must not exist.
func (c *Cluster) AddMember(m *Member) {
func (c *cluster) AddMember(m *Member) {
c.Lock()
defer c.Unlock()
b, err := json.Marshal(m.RaftAttributes)
@ -301,7 +301,7 @@ func (c *Cluster) AddMember(m *Member) {
// RemoveMember removes a member from the store.
// The given id MUST exist, or the function panics.
func (c *Cluster) RemoveMember(id types.ID) {
func (c *cluster) RemoveMember(id types.ID) {
c.Lock()
defer c.Unlock()
if _, err := c.store.Delete(memberStoreKey(id), true, true); err != nil {
@ -314,14 +314,14 @@ func (c *Cluster) RemoveMember(id types.ID) {
c.removed[id] = true
}
func (c *Cluster) UpdateAttributes(id types.ID, attr Attributes) {
func (c *cluster) UpdateAttributes(id types.ID, attr Attributes) {
c.Lock()
defer c.Unlock()
c.members[id].Attributes = attr
// TODO: update store in this function
}
func (c *Cluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) {
func (c *cluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) {
c.Lock()
defer c.Unlock()
b, err := json.Marshal(raftAttr)
@ -335,7 +335,7 @@ func (c *Cluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) {
c.members[id].RaftAttributes = raftAttr
}
func (c *Cluster) Version() *semver.Version {
func (c *cluster) Version() *semver.Version {
c.Lock()
defer c.Unlock()
if c.version == nil {
@ -344,7 +344,7 @@ func (c *Cluster) Version() *semver.Version {
return semver.Must(semver.NewVersion(c.version.String()))
}
func (c *Cluster) SetVersion(ver *semver.Version) {
func (c *cluster) SetVersion(ver *semver.Version) {
c.Lock()
defer c.Unlock()
if c.version != nil {
@ -401,7 +401,7 @@ func clusterVersionFromStore(st store.Store) *semver.Version {
// with the existing cluster. If the validation succeeds, it assigns the IDs
// from the existing cluster to the local cluster.
// If the validation fails, an error will be returned.
func ValidateClusterAndAssignIDs(local *Cluster, existing *Cluster) error {
func ValidateClusterAndAssignIDs(local *cluster, existing *cluster) error {
ems := existing.Members()
lms := local.Members()
if len(ems) != len(lms) {

View File

@ -470,7 +470,7 @@ func TestClusterAddMember(t *testing.T) {
}
func TestClusterMembers(t *testing.T) {
cls := &Cluster{
cls := &cluster{
members: map[types.ID]*Member{
1: &Member{ID: 1},
20: &Member{ID: 20},
@ -521,8 +521,8 @@ func TestNodeToMember(t *testing.T) {
}
}
func newTestCluster(membs []*Member) *Cluster {
c := &Cluster{members: make(map[types.ID]*Member), removed: make(map[types.ID]bool)}
func newTestCluster(membs []*Member) *cluster {
c := &cluster{members: make(map[types.ID]*Member), removed: make(map[types.ID]bool)}
for _, m := range membs {
c.members[m.ID] = m
}

View File

@ -30,7 +30,7 @@ import (
// isMemberBootstrapped tries to check if the given member has been bootstrapped
// in the given cluster.
func isMemberBootstrapped(cl *Cluster, member string, tr *http.Transport) bool {
func isMemberBootstrapped(cl *cluster, member string, tr *http.Transport) bool {
rcl, err := getClusterFromRemotePeers(getRemotePeerURLs(cl, member), false, tr)
if err != nil {
return false
@ -51,12 +51,12 @@ func isMemberBootstrapped(cl *Cluster, member string, tr *http.Transport) bool {
// these URLs. The first URL to provide a response is used. If no URLs provide
// a response, or a Cluster cannot be successfully created from a received
// response, an error is returned.
func GetClusterFromRemotePeers(urls []string, tr *http.Transport) (*Cluster, error) {
func GetClusterFromRemotePeers(urls []string, tr *http.Transport) (*cluster, error) {
return getClusterFromRemotePeers(urls, true, tr)
}
// If logerr is true, it prints out more error messages.
func getClusterFromRemotePeers(urls []string, logerr bool, tr *http.Transport) (*Cluster, error) {
func getClusterFromRemotePeers(urls []string, logerr bool, tr *http.Transport) (*cluster, error) {
cc := &http.Client{
Transport: tr,
Timeout: time.Second,
@ -90,14 +90,14 @@ func getClusterFromRemotePeers(urls []string, logerr bool, tr *http.Transport) (
}
continue
}
return NewClusterFromMembers("", id, membs), nil
return newClusterFromMembers("", id, membs), nil
}
return nil, fmt.Errorf("etcdserver: could not retrieve cluster information from the given urls")
}
// getRemotePeerURLs returns peer urls of remote members in the cluster. The
// returned list is sorted in ascending lexicographical order.
func getRemotePeerURLs(cl ClusterInfo, local string) []string {
func getRemotePeerURLs(cl Cluster, local string) []string {
us := make([]string, 0)
for _, m := range cl.Members() {
if m.Name == local {
@ -113,7 +113,7 @@ func getRemotePeerURLs(cl ClusterInfo, local string) []string {
// The key of the returned map is the member's ID. The value of the returned map
// is the semver version string. If it fails to get the version of a member, the key
// will be an empty string.
func getVersions(cl ClusterInfo, tr *http.Transport) map[string]string {
func getVersions(cl Cluster, tr *http.Transport) map[string]string {
members := cl.Members()
vers := make(map[string]string)
for _, m := range members {

View File

@ -62,11 +62,11 @@ func NewClientHandler(server *etcdserver.EtcdServer) http.Handler {
sec := security.NewStore(server, defaultServerTimeout)
kh := &keysHandler{
sec: sec,
server: server,
clusterInfo: server.Cluster,
timer: server,
timeout: defaultServerTimeout,
sec: sec,
server: server,
cluster: server.Cluster(),
timer: server,
timeout: defaultServerTimeout,
}
sh := &statsHandler{
@ -74,19 +74,19 @@ func NewClientHandler(server *etcdserver.EtcdServer) http.Handler {
}
mh := &membersHandler{
sec: sec,
server: server,
clusterInfo: server.Cluster,
clock: clockwork.NewRealClock(),
sec: sec,
server: server,
cluster: server.Cluster(),
clock: clockwork.NewRealClock(),
}
dmh := &deprecatedMachinesHandler{
clusterInfo: server.Cluster,
cluster: server.Cluster(),
}
sech := &securityHandler{
sec: sec,
clusterInfo: server.Cluster,
sec: sec,
cluster: server.Cluster(),
}
mux := http.NewServeMux()
@ -109,11 +109,11 @@ func NewClientHandler(server *etcdserver.EtcdServer) http.Handler {
}
type keysHandler struct {
sec *security.Store
server etcdserver.Server
clusterInfo etcdserver.ClusterInfo
timer etcdserver.RaftTimer
timeout time.Duration
sec *security.Store
server etcdserver.Server
cluster etcdserver.Cluster
timer etcdserver.RaftTimer
timeout time.Duration
}
func (h *keysHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@ -121,7 +121,7 @@ func (h *keysHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
w.Header().Set("X-Etcd-Cluster-ID", h.clusterInfo.ID().String())
w.Header().Set("X-Etcd-Cluster-ID", h.cluster.ID().String())
ctx, cancel := context.WithTimeout(context.Background(), h.timeout)
defer cancel()
@ -159,22 +159,22 @@ func (h *keysHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
type deprecatedMachinesHandler struct {
clusterInfo etcdserver.ClusterInfo
cluster etcdserver.Cluster
}
func (h *deprecatedMachinesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if !allowMethod(w, r.Method, "GET", "HEAD") {
return
}
endpoints := h.clusterInfo.ClientURLs()
endpoints := h.cluster.ClientURLs()
w.Write([]byte(strings.Join(endpoints, ", ")))
}
type membersHandler struct {
sec *security.Store
server etcdserver.Server
clusterInfo etcdserver.ClusterInfo
clock clockwork.Clock
sec *security.Store
server etcdserver.Server
cluster etcdserver.Cluster
clock clockwork.Clock
}
func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@ -185,7 +185,7 @@ func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
writeNoAuth(w)
return
}
w.Header().Set("X-Etcd-Cluster-ID", h.clusterInfo.ID().String())
w.Header().Set("X-Etcd-Cluster-ID", h.cluster.ID().String())
ctx, cancel := context.WithTimeout(context.Background(), defaultServerTimeout)
defer cancel()
@ -194,7 +194,7 @@ func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
case "GET":
switch trimPrefix(r.URL.Path, membersPrefix) {
case "":
mc := newMemberCollection(h.clusterInfo.Members())
mc := newMemberCollection(h.cluster.Members())
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(mc); err != nil {
log.Printf("etcdhttp: %v", err)
@ -205,7 +205,7 @@ func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
writeError(w, httptypes.NewHTTPError(http.StatusServiceUnavailable, "During election"))
return
}
m := newMember(h.clusterInfo.Member(id))
m := newMember(h.cluster.Member(id))
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(m); err != nil {
log.Printf("etcdhttp: %v", err)

View File

@ -28,8 +28,8 @@ import (
)
type securityHandler struct {
sec *security.Store
clusterInfo etcdserver.ClusterInfo
sec *security.Store
cluster etcdserver.Cluster
}
func hasWriteRootAccess(sec *security.Store, r *http.Request) bool {
@ -140,7 +140,7 @@ func (sh *securityHandler) baseRoles(w http.ResponseWriter, r *http.Request) {
writeNoAuth(w)
return
}
w.Header().Set("X-Etcd-Cluster-ID", sh.clusterInfo.ID().String())
w.Header().Set("X-Etcd-Cluster-ID", sh.cluster.ID().String())
w.Header().Set("Content-Type", "application/json")
var rolesCollections struct {
Roles []string `json:"roles"`
@ -185,7 +185,7 @@ func (sh *securityHandler) forRole(w http.ResponseWriter, r *http.Request, role
writeNoAuth(w)
return
}
w.Header().Set("X-Etcd-Cluster-ID", sh.clusterInfo.ID().String())
w.Header().Set("X-Etcd-Cluster-ID", sh.cluster.ID().String())
switch r.Method {
case "GET":
@ -245,7 +245,7 @@ func (sh *securityHandler) baseUsers(w http.ResponseWriter, r *http.Request) {
writeNoAuth(w)
return
}
w.Header().Set("X-Etcd-Cluster-ID", sh.clusterInfo.ID().String())
w.Header().Set("X-Etcd-Cluster-ID", sh.cluster.ID().String())
w.Header().Set("Content-Type", "application/json")
var usersCollections struct {
Users []string `json:"users"`
@ -290,7 +290,7 @@ func (sh *securityHandler) forUser(w http.ResponseWriter, r *http.Request, user
writeNoAuth(w)
return
}
w.Header().Set("X-Etcd-Cluster-ID", sh.clusterInfo.ID().String())
w.Header().Set("X-Etcd-Cluster-ID", sh.cluster.ID().String())
switch r.Method {
case "GET":
@ -360,7 +360,7 @@ func (sh *securityHandler) enableDisable(w http.ResponseWriter, r *http.Request)
writeNoAuth(w)
return
}
w.Header().Set("X-Etcd-Cluster-ID", sh.clusterInfo.ID().String())
w.Header().Set("X-Etcd-Cluster-ID", sh.cluster.ID().String())
w.Header().Set("Content-Type", "application/json")
isEnabled := sh.sec.SecurityEnabled()
switch r.Method {

View File

@ -564,9 +564,9 @@ func TestServeMembers(t *testing.T) {
members: map[uint64]*etcdserver.Member{1: &memb1, 2: &memb2},
}
h := &membersHandler{
server: &serverRecorder{},
clock: clockwork.NewFakeClock(),
clusterInfo: cluster,
server: &serverRecorder{},
clock: clockwork.NewFakeClock(),
cluster: cluster,
}
wmc := string(`{"members":[{"id":"c","name":"","peerURLs":[],"clientURLs":["http://localhost:8080"]},{"id":"d","name":"","peerURLs":[],"clientURLs":["http://localhost:8081"]}]}`)
@ -617,9 +617,9 @@ func TestServeLeader(t *testing.T) {
members: map[uint64]*etcdserver.Member{1: &memb1, 2: &memb2},
}
h := &membersHandler{
server: &serverRecorder{},
clock: clockwork.NewFakeClock(),
clusterInfo: cluster,
server: &serverRecorder{},
clock: clockwork.NewFakeClock(),
cluster: cluster,
}
wmc := string(`{"id":"1","name":"","peerURLs":[],"clientURLs":["http://localhost:8080"]}`)
@ -669,9 +669,9 @@ func TestServeMembersCreate(t *testing.T) {
req.Header.Set("Content-Type", "application/json")
s := &serverRecorder{}
h := &membersHandler{
server: s,
clock: clockwork.NewFakeClock(),
clusterInfo: &fakeCluster{id: 1},
server: s,
clock: clockwork.NewFakeClock(),
cluster: &fakeCluster{id: 1},
}
rw := httptest.NewRecorder()
@ -687,7 +687,7 @@ func TestServeMembersCreate(t *testing.T) {
t.Errorf("content-type = %s, want %s", gct, wct)
}
gcid := rw.Header().Get("X-Etcd-Cluster-ID")
wcid := h.clusterInfo.ID().String()
wcid := h.cluster.ID().String()
if gcid != wcid {
t.Errorf("cid = %s, want %s", gcid, wcid)
}
@ -718,8 +718,8 @@ func TestServeMembersDelete(t *testing.T) {
}
s := &serverRecorder{}
h := &membersHandler{
server: s,
clusterInfo: &fakeCluster{id: 1},
server: s,
cluster: &fakeCluster{id: 1},
}
rw := httptest.NewRecorder()
@ -730,7 +730,7 @@ func TestServeMembersDelete(t *testing.T) {
t.Errorf("code=%d, want %d", rw.Code, wcode)
}
gcid := rw.Header().Get("X-Etcd-Cluster-ID")
wcid := h.clusterInfo.ID().String()
wcid := h.cluster.ID().String()
if gcid != wcid {
t.Errorf("cid = %s, want %s", gcid, wcid)
}
@ -754,9 +754,9 @@ func TestServeMembersUpdate(t *testing.T) {
req.Header.Set("Content-Type", "application/json")
s := &serverRecorder{}
h := &membersHandler{
server: s,
clock: clockwork.NewFakeClock(),
clusterInfo: &fakeCluster{id: 1},
server: s,
clock: clockwork.NewFakeClock(),
cluster: &fakeCluster{id: 1},
}
rw := httptest.NewRecorder()
@ -768,7 +768,7 @@ func TestServeMembersUpdate(t *testing.T) {
}
gcid := rw.Header().Get("X-Etcd-Cluster-ID")
wcid := h.clusterInfo.ID().String()
wcid := h.cluster.ID().String()
if gcid != wcid {
t.Errorf("cid = %s, want %s", gcid, wcid)
}
@ -1046,9 +1046,9 @@ func TestServeMembersFail(t *testing.T) {
}
for i, tt := range tests {
h := &membersHandler{
server: tt.server,
clusterInfo: &fakeCluster{id: 1},
clock: clockwork.NewFakeClock(),
server: tt.server,
cluster: &fakeCluster{id: 1},
clock: clockwork.NewFakeClock(),
}
rw := httptest.NewRecorder()
h.ServeHTTP(rw, tt.req)
@ -1057,7 +1057,7 @@ func TestServeMembersFail(t *testing.T) {
}
if rw.Code != http.StatusMethodNotAllowed {
gcid := rw.Header().Get("X-Etcd-Cluster-ID")
wcid := h.clusterInfo.ID().String()
wcid := h.cluster.ID().String()
if gcid != wcid {
t.Errorf("#%d: cid = %s, want %s", i, gcid, wcid)
}
@ -1141,7 +1141,7 @@ func TestV2DeprecatedMachinesEndpoint(t *testing.T) {
{"POST", http.StatusMethodNotAllowed},
}
m := &deprecatedMachinesHandler{clusterInfo: &etcdserver.Cluster{}}
m := &deprecatedMachinesHandler{cluster: &fakeCluster{}}
s := httptest.NewServer(m)
defer s.Close()
@ -1170,7 +1170,7 @@ func TestServeMachines(t *testing.T) {
if err != nil {
t.Fatal(err)
}
h := &deprecatedMachinesHandler{clusterInfo: cluster}
h := &deprecatedMachinesHandler{cluster: cluster}
h.ServeHTTP(writer, req)
w := "http://localhost:8080, http://localhost:8081, http://localhost:8082"
if g := writer.Body.String(); g != w {
@ -1424,9 +1424,9 @@ func TestBadServeKeys(t *testing.T) {
}
for i, tt := range testBadCases {
h := &keysHandler{
timeout: 0, // context times out immediately
server: tt.server,
clusterInfo: &fakeCluster{id: 1},
timeout: 0, // context times out immediately
server: tt.server,
cluster: &fakeCluster{id: 1},
}
rw := httptest.NewRecorder()
h.ServeHTTP(rw, tt.req)
@ -1435,7 +1435,7 @@ func TestBadServeKeys(t *testing.T) {
}
if rw.Code != http.StatusMethodNotAllowed {
gcid := rw.Header().Get("X-Etcd-Cluster-ID")
wcid := h.clusterInfo.ID().String()
wcid := h.cluster.ID().String()
if gcid != wcid {
t.Errorf("#%d: cid = %s, want %s", i, gcid, wcid)
}
@ -1482,10 +1482,10 @@ func TestServeKeysGood(t *testing.T) {
}
for i, tt := range tests {
h := &keysHandler{
timeout: time.Hour,
server: server,
timer: &dummyRaftTimer{},
clusterInfo: &fakeCluster{id: 1},
timeout: time.Hour,
server: server,
timer: &dummyRaftTimer{},
cluster: &fakeCluster{id: 1},
}
rw := httptest.NewRecorder()
h.ServeHTTP(rw, tt.req)
@ -1506,10 +1506,10 @@ func TestServeKeysEvent(t *testing.T) {
},
}
h := &keysHandler{
timeout: time.Hour,
server: server,
clusterInfo: &fakeCluster{id: 1},
timer: &dummyRaftTimer{},
timeout: time.Hour,
server: server,
cluster: &fakeCluster{id: 1},
timer: &dummyRaftTimer{},
}
rw := httptest.NewRecorder()
@ -1528,7 +1528,7 @@ func TestServeKeysEvent(t *testing.T) {
t.Errorf("got code=%d, want %d", rw.Code, wcode)
}
gcid := rw.Header().Get("X-Etcd-Cluster-ID")
wcid := h.clusterInfo.ID().String()
wcid := h.cluster.ID().String()
if gcid != wcid {
t.Errorf("cid = %s, want %s", gcid, wcid)
}
@ -1550,10 +1550,10 @@ func TestServeKeysWatch(t *testing.T) {
},
}
h := &keysHandler{
timeout: time.Hour,
server: server,
clusterInfo: &fakeCluster{id: 1},
timer: &dummyRaftTimer{},
timeout: time.Hour,
server: server,
cluster: &fakeCluster{id: 1},
timer: &dummyRaftTimer{},
}
go func() {
ec <- &store.Event{
@ -1578,7 +1578,7 @@ func TestServeKeysWatch(t *testing.T) {
t.Errorf("got code=%d, want %d", rw.Code, wcode)
}
gcid := rw.Header().Get("X-Etcd-Cluster-ID")
wcid := h.clusterInfo.ID().String()
wcid := h.cluster.ID().String()
if gcid != wcid {
t.Errorf("cid = %s, want %s", gcid, wcid)
}

View File

@ -28,9 +28,9 @@ const (
)
// NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests.
func NewPeerHandler(clusterInfo etcdserver.ClusterInfo, raftHandler http.Handler) http.Handler {
func NewPeerHandler(cluster etcdserver.Cluster, raftHandler http.Handler) http.Handler {
mh := &peerMembersHandler{
clusterInfo: clusterInfo,
cluster: cluster,
}
mux := http.NewServeMux()
@ -43,20 +43,20 @@ func NewPeerHandler(clusterInfo etcdserver.ClusterInfo, raftHandler http.Handler
}
type peerMembersHandler struct {
clusterInfo etcdserver.ClusterInfo
cluster etcdserver.Cluster
}
func (h *peerMembersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if !allowMethod(w, r.Method, "GET") {
return
}
w.Header().Set("X-Etcd-Cluster-ID", h.clusterInfo.ID().String())
w.Header().Set("X-Etcd-Cluster-ID", h.cluster.ID().String())
if r.URL.Path != peerMembersPrefix {
http.Error(w, "bad path", http.StatusBadRequest)
return
}
ms := h.clusterInfo.Members()
ms := h.cluster.Members()
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(ms); err != nil {
log.Printf("etcdhttp: %v", err)

View File

@ -76,7 +76,7 @@ func TestServeMembersFails(t *testing.T) {
}
for i, tt := range tests {
rw := httptest.NewRecorder()
h := &peerMembersHandler{clusterInfo: nil}
h := &peerMembersHandler{cluster: nil}
h.ServeHTTP(rw, &http.Request{Method: tt.method})
if rw.Code != tt.wcode {
t.Errorf("#%d: code=%d, want %d", i, rw.Code, tt.wcode)
@ -91,7 +91,7 @@ func TestServeMembersGet(t *testing.T) {
id: 1,
members: map[uint64]*etcdserver.Member{1: &memb1, 2: &memb2},
}
h := &peerMembersHandler{clusterInfo: cluster}
h := &peerMembersHandler{cluster: cluster}
msb, err := json.Marshal([]etcdserver.Member{memb1, memb2})
if err != nil {
t.Fatal(err)

View File

@ -192,7 +192,7 @@ func (r *raftNode) resumeSending() {
p.Resume()
}
func startNode(cfg *ServerConfig, cl *Cluster, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) {
func startNode(cfg *ServerConfig, cl *cluster, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) {
var err error
member := cl.MemberByName(cfg.Name)
metadata := pbutil.MustMarshal(
@ -231,7 +231,7 @@ func startNode(cfg *ServerConfig, cl *Cluster, ids []types.ID) (id types.ID, n r
return
}
func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *Cluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *cluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
var walsnap walpb.Snapshot
if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
@ -260,7 +260,7 @@ func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *Clust
return id, cl, n, s, w
}
func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *Cluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *cluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
var walsnap walpb.Snapshot
if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term

View File

@ -154,7 +154,7 @@ type EtcdServer struct {
id types.ID
attributes Attributes
Cluster *Cluster
cluster *cluster
store store.Store
@ -178,7 +178,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
var n raft.Node
var s *raft.MemoryStorage
var id types.ID
var cl *Cluster
var cl *cluster
// Run the migrations.
dataVer, err := version.DetectDataDir(cfg.DataDir)
@ -198,7 +198,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
if err := cfg.VerifyJoinExisting(); err != nil {
return nil, err
}
cl, err = NewCluster(cfg.InitialClusterToken, cfg.InitialPeerURLsMap)
cl, err = newClusterFromURLsMap(cfg.InitialClusterToken, cfg.InitialPeerURLsMap)
if err != nil {
return nil, err
}
@ -218,7 +218,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
if err := cfg.VerifyBootstrap(); err != nil {
return nil, err
}
cl, err = NewCluster(cfg.InitialClusterToken, cfg.InitialPeerURLsMap)
cl, err = newClusterFromURLsMap(cfg.InitialClusterToken, cfg.InitialPeerURLsMap)
if err != nil {
return nil, err
}
@ -238,7 +238,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
if checkDuplicateURL(urlsmap) {
return nil, fmt.Errorf("discovery cluster %s has duplicate url", urlsmap)
}
if cl, err = NewCluster(cfg.InitialClusterToken, urlsmap); err != nil {
if cl, err = newClusterFromURLsMap(cfg.InitialClusterToken, urlsmap); err != nil {
return nil, err
}
}
@ -302,7 +302,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
},
id: id,
attributes: Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
Cluster: cl,
cluster: cl,
stats: sstats,
lstats: lstats,
SyncTicker: time.Tick(500 * time.Millisecond),
@ -379,10 +379,12 @@ func (s *EtcdServer) purgeFile() {
func (s *EtcdServer) ID() types.ID { return s.id }
func (s *EtcdServer) Cluster() Cluster { return s.cluster }
func (s *EtcdServer) RaftHandler() http.Handler { return s.r.transport.Handler() }
func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
if s.Cluster.IsIDRemoved(types.ID(m.From)) {
if s.cluster.IsIDRemoved(types.ID(m.From)) {
log.Printf("etcdserver: reject message from removed member %s", types.ID(m.From).String())
return httptypes.NewHTTPError(http.StatusForbidden, "cannot process message from removed member")
}
@ -392,7 +394,7 @@ func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
return s.r.Step(ctx, m)
}
func (s *EtcdServer) IsIDRemoved(id uint64) bool { return s.Cluster.IsIDRemoved(types.ID(id)) }
func (s *EtcdServer) IsIDRemoved(id uint64) bool { return s.cluster.IsIDRemoved(types.ID(id)) }
func (s *EtcdServer) ReportUnreachable(id uint64) { s.r.ReportUnreachable(id) }
@ -432,11 +434,11 @@ func (s *EtcdServer) run() {
if err := s.store.Recovery(apply.snapshot.Data); err != nil {
log.Panicf("recovery store error: %v", err)
}
s.Cluster.Recover()
s.cluster.Recover()
// recover raft transport
s.r.transport.RemoveAllPeers()
for _, m := range s.Cluster.Members() {
for _, m := range s.cluster.Members() {
if m.ID == s.ID() {
continue
}
@ -700,7 +702,7 @@ func (s *EtcdServer) publish(retryInterval time.Duration) {
cancel()
switch err {
case nil:
log.Printf("etcdserver: published %+v to cluster %s", s.attributes, s.Cluster.ID())
log.Printf("etcdserver: published %+v to cluster %s", s.attributes, s.cluster.ID())
return
case ErrStopped:
log.Printf("etcdserver: aborting publish because server is stopped")
@ -713,7 +715,7 @@ func (s *EtcdServer) publish(retryInterval time.Duration) {
func (s *EtcdServer) send(ms []raftpb.Message) {
for i, _ := range ms {
if s.Cluster.IsIDRemoved(types.ID(ms[i].To)) {
if s.cluster.IsIDRemoved(types.ID(ms[i].To)) {
ms[i].To = 0
}
}
@ -791,10 +793,10 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response {
if err := json.Unmarshal([]byte(r.Val), &attr); err != nil {
log.Panicf("unmarshal %s should never fail: %v", r.Val, err)
}
s.Cluster.UpdateAttributes(id, attr)
s.cluster.UpdateAttributes(id, attr)
}
if r.Path == path.Join(StoreClusterPrefix, "version") {
s.Cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)))
s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)))
}
return f(s.store.Set(r.Path, r.Dir, r.Val, expr))
}
@ -819,7 +821,7 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response {
// applyConfChange applies a ConfChange to the server. It is only
// invoked with a ConfChange that has already passed through Raft
func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState) (bool, error) {
if err := s.Cluster.ValidateConfigurationChange(cc); err != nil {
if err := s.cluster.ValidateConfigurationChange(cc); err != nil {
cc.NodeID = raft.None
s.r.ApplyConfChange(cc)
return false, err
@ -834,21 +836,21 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
if cc.NodeID != uint64(m.ID) {
log.Panicf("nodeID should always be equal to member ID")
}
s.Cluster.AddMember(m)
s.cluster.AddMember(m)
if m.ID == s.id {
log.Printf("etcdserver: added local member %s %v to cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
log.Printf("etcdserver: added local member %s %v to cluster %s", m.ID, m.PeerURLs, s.cluster.ID())
} else {
s.r.transport.AddPeer(m.ID, m.PeerURLs)
log.Printf("etcdserver: added member %s %v to cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
log.Printf("etcdserver: added member %s %v to cluster %s", m.ID, m.PeerURLs, s.cluster.ID())
}
case raftpb.ConfChangeRemoveNode:
id := types.ID(cc.NodeID)
s.Cluster.RemoveMember(id)
s.cluster.RemoveMember(id)
if id == s.id {
return true, nil
} else {
s.r.transport.RemovePeer(id)
log.Printf("etcdserver: removed member %s from cluster %s", id, s.Cluster.ID())
log.Printf("etcdserver: removed member %s from cluster %s", id, s.cluster.ID())
}
case raftpb.ConfChangeUpdateNode:
m := new(Member)
@ -858,12 +860,12 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
if cc.NodeID != uint64(m.ID) {
log.Panicf("nodeID should always be equal to member ID")
}
s.Cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes)
s.cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes)
if m.ID == s.id {
log.Printf("etcdserver: update local member %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
log.Printf("etcdserver: update local member %s %v in cluster %s", m.ID, m.PeerURLs, s.cluster.ID())
} else {
s.r.transport.UpdatePeer(m.ID, m.PeerURLs)
log.Printf("etcdserver: update member %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
log.Printf("etcdserver: update member %s %v in cluster %s", m.ID, m.PeerURLs, s.cluster.ID())
}
}
return false, nil
@ -917,10 +919,10 @@ func (s *EtcdServer) PauseSending() { s.r.pauseSending() }
func (s *EtcdServer) ResumeSending() { s.r.resumeSending() }
func (s *EtcdServer) ClusterVersion() *semver.Version {
if s.Cluster == nil {
if s.cluster == nil {
return nil
}
return s.Cluster.Version()
return s.cluster.Version()
}
// monitorVersions checks the member's version every monitorVersion interval.
@ -940,7 +942,7 @@ func (s *EtcdServer) monitorVersions() {
continue
}
v := decideClusterVersion(getVersions(s.Cluster, s.cfg.Transport))
v := decideClusterVersion(getVersions(s.cluster, s.cfg.Transport))
if v != nil {
// only keep major.minor version for comparasion
v = &semver.Version{
@ -952,7 +954,7 @@ func (s *EtcdServer) monitorVersions() {
// if the current version is nil:
// 1. use the decided version if possible
// 2. or use the min cluster version
if s.Cluster.Version() == nil {
if s.cluster.Version() == nil {
if v != nil {
go s.updateClusterVersion(v.String())
} else {
@ -963,17 +965,17 @@ func (s *EtcdServer) monitorVersions() {
// update cluster version only if the decided version is greater than
// the current cluster version
if v != nil && s.Cluster.Version().LessThan(*v) {
if v != nil && s.cluster.Version().LessThan(*v) {
go s.updateClusterVersion(v.String())
}
}
}
func (s *EtcdServer) updateClusterVersion(ver string) {
if s.Cluster.Version() == nil {
if s.cluster.Version() == nil {
log.Printf("etcdsever: setting up the initial cluster version to %v", ver)
} else {
log.Printf("etcdsever: updating the cluster version from %v to %v", s.Cluster.Version(), ver)
log.Printf("etcdsever: updating the cluster version from %v to %v", s.cluster.Version(), ver)
}
req := pb.Request{
Method: "PUT",

View File

@ -393,7 +393,7 @@ func TestApplyRequestOnAdminMemberAttributes(t *testing.T) {
cl := newTestCluster([]*Member{{ID: 1}})
srv := &EtcdServer{
store: &storeRecorder{},
Cluster: cl,
cluster: cl,
}
req := pb.Request{
Method: "PUT",
@ -453,7 +453,7 @@ func TestApplyConfChangeError(t *testing.T) {
n := &nodeRecorder{}
srv := &EtcdServer{
r: raftNode{Node: n},
Cluster: cl,
cluster: cl,
}
_, err := srv.applyConfChange(tt.cc, nil)
if err != tt.werr {
@ -484,7 +484,7 @@ func TestApplyConfChangeShouldStop(t *testing.T) {
Node: &nodeRecorder{},
transport: &nopTransporter{},
},
Cluster: cl,
cluster: cl,
}
cc := raftpb.ConfChange{
Type: raftpb.ConfChangeRemoveNode,
@ -780,7 +780,7 @@ func TestRecvSnapshot(t *testing.T) {
raftStorage: raft.NewMemoryStorage(),
},
store: st,
Cluster: cl,
cluster: cl,
}
s.start()
@ -815,7 +815,7 @@ func TestApplySnapshotAndCommittedEntries(t *testing.T) {
transport: &nopTransporter{},
},
store: st,
Cluster: cl,
cluster: cl,
}
s.start()
@ -859,7 +859,7 @@ func TestAddMember(t *testing.T) {
transport: &nopTransporter{},
},
store: st,
Cluster: cl,
cluster: cl,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
}
s.start()
@ -898,7 +898,7 @@ func TestRemoveMember(t *testing.T) {
transport: &nopTransporter{},
},
store: st,
Cluster: cl,
cluster: cl,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
}
s.start()
@ -936,7 +936,7 @@ func TestUpdateMember(t *testing.T) {
transport: &nopTransporter{},
},
store: st,
Cluster: cl,
cluster: cl,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
}
s.start()
@ -969,7 +969,7 @@ func TestPublish(t *testing.T) {
id: 1,
r: raftNode{Node: n},
attributes: Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}},
Cluster: &Cluster{},
cluster: &cluster{},
w: w,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
}
@ -1010,7 +1010,7 @@ func TestPublishStopped(t *testing.T) {
Node: &nodeRecorder{},
transport: &nopTransporter{},
},
Cluster: &Cluster{},
cluster: &cluster{},
w: &waitRecorder{},
done: make(chan struct{}),
stop: make(chan struct{}),
@ -1051,7 +1051,7 @@ func TestUpdateVersion(t *testing.T) {
id: 1,
r: raftNode{Node: n},
attributes: Attributes{Name: "node1", ClientURLs: []string{"http://node1.com"}},
Cluster: &Cluster{},
cluster: &cluster{},
w: w,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
}
@ -1137,7 +1137,7 @@ func TestGetOtherPeerURLs(t *testing.T) {
},
}
for i, tt := range tests {
cl := NewClusterFromMembers("", types.ID(0), tt.membs)
cl := newClusterFromMembers("", types.ID(0), tt.membs)
urls := getRemotePeerURLs(cl, tt.self)
if !reflect.DeepEqual(urls, tt.wurls) {
t.Errorf("#%d: urls = %+v, want %+v", i, urls, tt.wurls)

View File

@ -695,7 +695,7 @@ func (m *member) Launch() error {
m.s.SyncTicker = time.Tick(500 * time.Millisecond)
m.s.Start()
m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s.Cluster, m.s.RaftHandler())}
m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s.Cluster(), m.s.RaftHandler())}
for _, ln := range m.PeerListeners {
hs := &httptest.Server{