Merge pull request #4994 from xiang90/clu

etcdserver: move membership related code to membership pkg
This commit is contained in:
Xiang Li 2016-04-07 14:39:18 -07:00
commit f31105bc08
21 changed files with 319 additions and 255 deletions

View File

@ -252,7 +252,7 @@ func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster,
// wait for cluster to start // wait for cluster to start
readyC := make(chan error, cfg.clusterSize+cfg.proxySize) readyC := make(chan error, cfg.clusterSize+cfg.proxySize)
readyStr := "etcdserver: set the initial cluster version to" readyStr := "membership: set the initial cluster version to"
for i := range etcdCfgs { for i := range etcdCfgs {
go func(etcdp *etcdProcess) { go func(etcdp *etcdProcess) {
rs := readyStr rs := readyStr

View File

@ -15,7 +15,7 @@
package api package api
import ( import (
"github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/pkg/types"
"github.com/coreos/go-semver/semver" "github.com/coreos/go-semver/semver"
@ -29,10 +29,10 @@ type Cluster interface {
// cluster is listening for client requests // cluster is listening for client requests
ClientURLs() []string ClientURLs() []string
// Members returns a slice of members sorted by their ID // Members returns a slice of members sorted by their ID
Members() []*etcdserver.Member Members() []*membership.Member
// Member retrieves a particular member based on ID, or nil if the // Member retrieves a particular member based on ID, or nil if the
// member does not exist in the cluster // member does not exist in the cluster
Member(id types.ID) *etcdserver.Member Member(id types.ID) *membership.Member
// IsIDRemoved checks whether the given ID has been removed from this // IsIDRemoved checks whether the given ID has been removed from this
// cluster at some point in the past // cluster at some point in the past
IsIDRemoved(id types.ID) bool IsIDRemoved(id types.ID) bool

View File

@ -34,6 +34,7 @@ import (
"github.com/coreos/etcd/etcdserver/api/v2http/httptypes" "github.com/coreos/etcd/etcdserver/api/v2http/httptypes"
"github.com/coreos/etcd/etcdserver/auth" "github.com/coreos/etcd/etcdserver/auth"
"github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/etcdserver/stats" "github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft"
@ -248,10 +249,10 @@ func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return return
} }
now := h.clock.Now() now := h.clock.Now()
m := etcdserver.NewMember("", req.PeerURLs, "", &now) m := membership.NewMember("", req.PeerURLs, "", &now)
err := h.server.AddMember(ctx, *m) err := h.server.AddMember(ctx, *m)
switch { switch {
case err == etcdserver.ErrIDExists || err == etcdserver.ErrPeerURLexists: case err == membership.ErrIDExists || err == membership.ErrPeerURLexists:
writeError(w, r, httptypes.NewHTTPError(http.StatusConflict, err.Error())) writeError(w, r, httptypes.NewHTTPError(http.StatusConflict, err.Error()))
return return
case err != nil: case err != nil:
@ -272,9 +273,9 @@ func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
} }
err := h.server.RemoveMember(ctx, uint64(id)) err := h.server.RemoveMember(ctx, uint64(id))
switch { switch {
case err == etcdserver.ErrIDRemoved: case err == membership.ErrIDRemoved:
writeError(w, r, httptypes.NewHTTPError(http.StatusGone, fmt.Sprintf("Member permanently removed: %s", id))) writeError(w, r, httptypes.NewHTTPError(http.StatusGone, fmt.Sprintf("Member permanently removed: %s", id)))
case err == etcdserver.ErrIDNotFound: case err == membership.ErrIDNotFound:
writeError(w, r, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", id))) writeError(w, r, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", id)))
case err != nil: case err != nil:
plog.Errorf("error removing member %s (%v)", id, err) plog.Errorf("error removing member %s (%v)", id, err)
@ -291,15 +292,15 @@ func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if ok := unmarshalRequest(r, &req, w); !ok { if ok := unmarshalRequest(r, &req, w); !ok {
return return
} }
m := etcdserver.Member{ m := membership.Member{
ID: id, ID: id,
RaftAttributes: etcdserver.RaftAttributes{PeerURLs: req.PeerURLs.StringSlice()}, RaftAttributes: membership.RaftAttributes{PeerURLs: req.PeerURLs.StringSlice()},
} }
err := h.server.UpdateMember(ctx, m) err := h.server.UpdateMember(ctx, m)
switch { switch {
case err == etcdserver.ErrPeerURLexists: case err == membership.ErrPeerURLexists:
writeError(w, r, httptypes.NewHTTPError(http.StatusConflict, err.Error())) writeError(w, r, httptypes.NewHTTPError(http.StatusConflict, err.Error()))
case err == etcdserver.ErrIDNotFound: case err == membership.ErrIDNotFound:
writeError(w, r, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", id))) writeError(w, r, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", id)))
case err != nil: case err != nil:
plog.Errorf("error updating member %s (%v)", m.ID, err) plog.Errorf("error updating member %s (%v)", m.ID, err)
@ -804,7 +805,7 @@ func trimPrefix(p, prefix string) (s string) {
return return
} }
func newMemberCollection(ms []*etcdserver.Member) *httptypes.MemberCollection { func newMemberCollection(ms []*membership.Member) *httptypes.MemberCollection {
c := httptypes.MemberCollection(make([]httptypes.Member, len(ms))) c := httptypes.MemberCollection(make([]httptypes.Member, len(ms)))
for i, m := range ms { for i, m := range ms {
@ -814,7 +815,7 @@ func newMemberCollection(ms []*etcdserver.Member) *httptypes.MemberCollection {
return &c return &c
} }
func newMember(m *etcdserver.Member) httptypes.Member { func newMember(m *membership.Member) httptypes.Member {
tm := httptypes.Member{ tm := httptypes.Member{
ID: m.ID.String(), ID: m.ID.String(),
Name: m.Name, Name: m.Name,

View File

@ -32,6 +32,7 @@ import (
"github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api/v2http/httptypes" "github.com/coreos/etcd/etcdserver/api/v2http/httptypes"
"github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/pkg/testutil" "github.com/coreos/etcd/pkg/testutil"
"github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/raft/raftpb"
@ -103,7 +104,7 @@ func (s *serverRecorder) Process(_ context.Context, m raftpb.Message) error {
s.actions = append(s.actions, action{name: "Process", params: []interface{}{m}}) s.actions = append(s.actions, action{name: "Process", params: []interface{}{m}})
return nil return nil
} }
func (s *serverRecorder) AddMember(_ context.Context, m etcdserver.Member) error { func (s *serverRecorder) AddMember(_ context.Context, m membership.Member) error {
s.actions = append(s.actions, action{name: "AddMember", params: []interface{}{m}}) s.actions = append(s.actions, action{name: "AddMember", params: []interface{}{m}})
return nil return nil
} }
@ -112,7 +113,7 @@ func (s *serverRecorder) RemoveMember(_ context.Context, id uint64) error {
return nil return nil
} }
func (s *serverRecorder) UpdateMember(_ context.Context, m etcdserver.Member) error { func (s *serverRecorder) UpdateMember(_ context.Context, m membership.Member) error {
s.actions = append(s.actions, action{name: "UpdateMember", params: []interface{}{m}}) s.actions = append(s.actions, action{name: "UpdateMember", params: []interface{}{m}})
return nil return nil
} }
@ -149,9 +150,9 @@ func (rs *resServer) Do(_ context.Context, _ etcdserverpb.Request) (etcdserver.R
return rs.res, nil return rs.res, nil
} }
func (rs *resServer) Process(_ context.Context, _ raftpb.Message) error { return nil } func (rs *resServer) Process(_ context.Context, _ raftpb.Message) error { return nil }
func (rs *resServer) AddMember(_ context.Context, _ etcdserver.Member) error { return nil } func (rs *resServer) AddMember(_ context.Context, _ membership.Member) error { return nil }
func (rs *resServer) RemoveMember(_ context.Context, _ uint64) error { return nil } func (rs *resServer) RemoveMember(_ context.Context, _ uint64) error { return nil }
func (rs *resServer) UpdateMember(_ context.Context, _ etcdserver.Member) error { return nil } func (rs *resServer) UpdateMember(_ context.Context, _ membership.Member) error { return nil }
func (rs *resServer) ClusterVersion() *semver.Version { return nil } func (rs *resServer) ClusterVersion() *semver.Version { return nil }
func boolp(b bool) *bool { return &b } func boolp(b bool) *bool { return &b }
@ -600,11 +601,11 @@ func TestGoodParseRequest(t *testing.T) {
} }
func TestServeMembers(t *testing.T) { func TestServeMembers(t *testing.T) {
memb1 := etcdserver.Member{ID: 12, Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8080"}}} memb1 := membership.Member{ID: 12, Attributes: membership.Attributes{ClientURLs: []string{"http://localhost:8080"}}}
memb2 := etcdserver.Member{ID: 13, Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8081"}}} memb2 := membership.Member{ID: 13, Attributes: membership.Attributes{ClientURLs: []string{"http://localhost:8081"}}}
cluster := &fakeCluster{ cluster := &fakeCluster{
id: 1, id: 1,
members: map[uint64]*etcdserver.Member{1: &memb1, 2: &memb2}, members: map[uint64]*membership.Member{1: &memb1, 2: &memb2},
} }
h := &membersHandler{ h := &membersHandler{
server: &serverRecorder{}, server: &serverRecorder{},
@ -653,11 +654,11 @@ func TestServeMembers(t *testing.T) {
// TODO: consolidate **ALL** fake server implementations and add no leader test case. // TODO: consolidate **ALL** fake server implementations and add no leader test case.
func TestServeLeader(t *testing.T) { func TestServeLeader(t *testing.T) {
memb1 := etcdserver.Member{ID: 1, Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8080"}}} memb1 := membership.Member{ID: 1, Attributes: membership.Attributes{ClientURLs: []string{"http://localhost:8080"}}}
memb2 := etcdserver.Member{ID: 2, Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8081"}}} memb2 := membership.Member{ID: 2, Attributes: membership.Attributes{ClientURLs: []string{"http://localhost:8081"}}}
cluster := &fakeCluster{ cluster := &fakeCluster{
id: 1, id: 1,
members: map[uint64]*etcdserver.Member{1: &memb1, 2: &memb2}, members: map[uint64]*membership.Member{1: &memb1, 2: &memb2},
} }
h := &membersHandler{ h := &membersHandler{
server: &serverRecorder{}, server: &serverRecorder{},
@ -741,9 +742,9 @@ func TestServeMembersCreate(t *testing.T) {
t.Errorf("got body=%q, want %q", g, wb) t.Errorf("got body=%q, want %q", g, wb)
} }
wm := etcdserver.Member{ wm := membership.Member{
ID: 3064321551348478165, ID: 3064321551348478165,
RaftAttributes: etcdserver.RaftAttributes{ RaftAttributes: membership.RaftAttributes{
PeerURLs: []string{"http://127.0.0.1:1"}, PeerURLs: []string{"http://127.0.0.1:1"},
}, },
} }
@ -816,9 +817,9 @@ func TestServeMembersUpdate(t *testing.T) {
t.Errorf("cid = %s, want %s", gcid, wcid) t.Errorf("cid = %s, want %s", gcid, wcid)
} }
wm := etcdserver.Member{ wm := membership.Member{
ID: 1, ID: 1,
RaftAttributes: etcdserver.RaftAttributes{ RaftAttributes: membership.RaftAttributes{
PeerURLs: []string{"http://127.0.0.1:1"}, PeerURLs: []string{"http://127.0.0.1:1"},
}, },
} }
@ -913,7 +914,7 @@ func TestServeMembersFail(t *testing.T) {
Header: map[string][]string{"Content-Type": {"application/json"}}, Header: map[string][]string{"Content-Type": {"application/json"}},
}, },
&errServer{ &errServer{
etcdserver.ErrIDExists, membership.ErrIDExists,
}, },
http.StatusConflict, http.StatusConflict,
@ -927,7 +928,7 @@ func TestServeMembersFail(t *testing.T) {
Header: map[string][]string{"Content-Type": {"application/json"}}, Header: map[string][]string{"Content-Type": {"application/json"}},
}, },
&errServer{ &errServer{
etcdserver.ErrPeerURLexists, membership.ErrPeerURLexists,
}, },
http.StatusConflict, http.StatusConflict,
@ -951,7 +952,7 @@ func TestServeMembersFail(t *testing.T) {
Method: "DELETE", Method: "DELETE",
}, },
&errServer{ &errServer{
etcdserver.ErrIDRemoved, membership.ErrIDRemoved,
}, },
http.StatusGone, http.StatusGone,
@ -963,7 +964,7 @@ func TestServeMembersFail(t *testing.T) {
Method: "DELETE", Method: "DELETE",
}, },
&errServer{ &errServer{
etcdserver.ErrIDNotFound, membership.ErrIDNotFound,
}, },
http.StatusNotFound, http.StatusNotFound,
@ -1047,7 +1048,7 @@ func TestServeMembersFail(t *testing.T) {
Header: map[string][]string{"Content-Type": {"application/json"}}, Header: map[string][]string{"Content-Type": {"application/json"}},
}, },
&errServer{ &errServer{
etcdserver.ErrPeerURLexists, membership.ErrPeerURLexists,
}, },
http.StatusConflict, http.StatusConflict,
@ -1061,7 +1062,7 @@ func TestServeMembersFail(t *testing.T) {
Header: map[string][]string{"Content-Type": {"application/json"}}, Header: map[string][]string{"Content-Type": {"application/json"}},
}, },
&errServer{ &errServer{
etcdserver.ErrIDNotFound, membership.ErrIDNotFound,
}, },
http.StatusNotFound, http.StatusNotFound,
@ -1963,16 +1964,16 @@ func TestTrimPrefix(t *testing.T) {
} }
func TestNewMemberCollection(t *testing.T) { func TestNewMemberCollection(t *testing.T) {
fixture := []*etcdserver.Member{ fixture := []*membership.Member{
{ {
ID: 12, ID: 12,
Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8080", "http://localhost:8081"}}, Attributes: membership.Attributes{ClientURLs: []string{"http://localhost:8080", "http://localhost:8081"}},
RaftAttributes: etcdserver.RaftAttributes{PeerURLs: []string{"http://localhost:8082", "http://localhost:8083"}}, RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"http://localhost:8082", "http://localhost:8083"}},
}, },
{ {
ID: 13, ID: 13,
Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:9090", "http://localhost:9091"}}, Attributes: membership.Attributes{ClientURLs: []string{"http://localhost:9090", "http://localhost:9091"}},
RaftAttributes: etcdserver.RaftAttributes{PeerURLs: []string{"http://localhost:9092", "http://localhost:9093"}}, RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"http://localhost:9092", "http://localhost:9093"}},
}, },
} }
got := newMemberCollection(fixture) got := newMemberCollection(fixture)
@ -1996,10 +1997,10 @@ func TestNewMemberCollection(t *testing.T) {
} }
func TestNewMember(t *testing.T) { func TestNewMember(t *testing.T) {
fixture := &etcdserver.Member{ fixture := &membership.Member{
ID: 12, ID: 12,
Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8080", "http://localhost:8081"}}, Attributes: membership.Attributes{ClientURLs: []string{"http://localhost:8080", "http://localhost:8081"}},
RaftAttributes: etcdserver.RaftAttributes{PeerURLs: []string{"http://localhost:8082", "http://localhost:8083"}}, RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"http://localhost:8082", "http://localhost:8083"}},
} }
got := newMember(fixture) got := newMember(fixture)

View File

@ -23,6 +23,7 @@ import (
etcdErr "github.com/coreos/etcd/error" etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api/v2http/httptypes" "github.com/coreos/etcd/etcdserver/api/v2http/httptypes"
"github.com/coreos/etcd/etcdserver/auth" "github.com/coreos/etcd/etcdserver/auth"
"github.com/coreos/etcd/pkg/logutil" "github.com/coreos/etcd/pkg/logutil"
"github.com/coreos/pkg/capnslog" "github.com/coreos/pkg/capnslog"

View File

@ -24,6 +24,7 @@ import (
etcdErr "github.com/coreos/etcd/error" etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/go-semver/semver" "github.com/coreos/go-semver/semver"
@ -33,20 +34,20 @@ import (
type fakeCluster struct { type fakeCluster struct {
id uint64 id uint64
clientURLs []string clientURLs []string
members map[uint64]*etcdserver.Member members map[uint64]*membership.Member
} }
func (c *fakeCluster) ID() types.ID { return types.ID(c.id) } func (c *fakeCluster) ID() types.ID { return types.ID(c.id) }
func (c *fakeCluster) ClientURLs() []string { return c.clientURLs } func (c *fakeCluster) ClientURLs() []string { return c.clientURLs }
func (c *fakeCluster) Members() []*etcdserver.Member { func (c *fakeCluster) Members() []*membership.Member {
var ms etcdserver.MembersByID var ms membership.MembersByID
for _, m := range c.members { for _, m := range c.members {
ms = append(ms, m) ms = append(ms, m)
} }
sort.Sort(ms) sort.Sort(ms)
return []*etcdserver.Member(ms) return []*membership.Member(ms)
} }
func (c *fakeCluster) Member(id types.ID) *etcdserver.Member { return c.members[uint64(id)] } func (c *fakeCluster) Member(id types.ID) *membership.Member { return c.members[uint64(id)] }
func (c *fakeCluster) IsIDRemoved(id types.ID) bool { return false } func (c *fakeCluster) IsIDRemoved(id types.ID) bool { return false }
func (c *fakeCluster) Version() *semver.Version { return nil } func (c *fakeCluster) Version() *semver.Version { return nil }
@ -66,13 +67,13 @@ func (fs *errServer) Do(ctx context.Context, r etcdserverpb.Request) (etcdserver
func (fs *errServer) Process(ctx context.Context, m raftpb.Message) error { func (fs *errServer) Process(ctx context.Context, m raftpb.Message) error {
return fs.err return fs.err
} }
func (fs *errServer) AddMember(ctx context.Context, m etcdserver.Member) error { func (fs *errServer) AddMember(ctx context.Context, m membership.Member) error {
return fs.err return fs.err
} }
func (fs *errServer) RemoveMember(ctx context.Context, id uint64) error { func (fs *errServer) RemoveMember(ctx context.Context, id uint64) error {
return fs.err return fs.err
} }
func (fs *errServer) UpdateMember(ctx context.Context, m etcdserver.Member) error { func (fs *errServer) UpdateMember(ctx context.Context, m membership.Member) error {
return fs.err return fs.err
} }

View File

@ -22,7 +22,7 @@ import (
"path" "path"
"testing" "testing"
"github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/pkg/testutil" "github.com/coreos/etcd/pkg/testutil"
"github.com/coreos/etcd/rafthttp" "github.com/coreos/etcd/rafthttp"
) )
@ -85,14 +85,14 @@ func TestServeMembersFails(t *testing.T) {
} }
func TestServeMembersGet(t *testing.T) { func TestServeMembersGet(t *testing.T) {
memb1 := etcdserver.Member{ID: 1, Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8080"}}} memb1 := membership.Member{ID: 1, Attributes: membership.Attributes{ClientURLs: []string{"http://localhost:8080"}}}
memb2 := etcdserver.Member{ID: 2, Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8081"}}} memb2 := membership.Member{ID: 2, Attributes: membership.Attributes{ClientURLs: []string{"http://localhost:8081"}}}
cluster := &fakeCluster{ cluster := &fakeCluster{
id: 1, id: 1,
members: map[uint64]*etcdserver.Member{1: &memb1, 2: &memb2}, members: map[uint64]*membership.Member{1: &memb1, 2: &memb2},
} }
h := &peerMembersHandler{cluster: cluster} h := &peerMembersHandler{cluster: cluster}
msb, err := json.Marshal([]etcdserver.Member{memb1, memb2}) msb, err := json.Marshal([]membership.Member{memb1, memb2})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -21,6 +21,7 @@ import (
"github.com/coreos/etcd/etcdserver/api" "github.com/coreos/etcd/etcdserver/api"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb" pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/pkg/types"
"golang.org/x/net/context" "golang.org/x/net/context"
"google.golang.org/grpc" "google.golang.org/grpc"
@ -48,12 +49,12 @@ func (cs *ClusterServer) MemberAdd(ctx context.Context, r *pb.MemberAddRequest)
} }
now := time.Now() now := time.Now()
m := etcdserver.NewMember("", urls, "", &now) m := membership.NewMember("", urls, "", &now)
err = cs.server.AddMember(ctx, *m) err = cs.server.AddMember(ctx, *m)
switch { switch {
case err == etcdserver.ErrIDExists: case err == membership.ErrIDExists:
return nil, rpctypes.ErrMemberExist return nil, rpctypes.ErrMemberExist
case err == etcdserver.ErrPeerURLexists: case err == membership.ErrPeerURLexists:
return nil, rpctypes.ErrPeerURLExist return nil, rpctypes.ErrPeerURLExist
case err != nil: case err != nil:
return nil, grpc.Errorf(codes.Internal, err.Error()) return nil, grpc.Errorf(codes.Internal, err.Error())
@ -68,9 +69,9 @@ func (cs *ClusterServer) MemberAdd(ctx context.Context, r *pb.MemberAddRequest)
func (cs *ClusterServer) MemberRemove(ctx context.Context, r *pb.MemberRemoveRequest) (*pb.MemberRemoveResponse, error) { func (cs *ClusterServer) MemberRemove(ctx context.Context, r *pb.MemberRemoveRequest) (*pb.MemberRemoveResponse, error) {
err := cs.server.RemoveMember(ctx, r.ID) err := cs.server.RemoveMember(ctx, r.ID)
switch { switch {
case err == etcdserver.ErrIDRemoved: case err == membership.ErrIDRemoved:
fallthrough fallthrough
case err == etcdserver.ErrIDNotFound: case err == membership.ErrIDNotFound:
return nil, rpctypes.ErrMemberNotFound return nil, rpctypes.ErrMemberNotFound
case err != nil: case err != nil:
return nil, grpc.Errorf(codes.Internal, err.Error()) return nil, grpc.Errorf(codes.Internal, err.Error())
@ -80,15 +81,15 @@ func (cs *ClusterServer) MemberRemove(ctx context.Context, r *pb.MemberRemoveReq
} }
func (cs *ClusterServer) MemberUpdate(ctx context.Context, r *pb.MemberUpdateRequest) (*pb.MemberUpdateResponse, error) { func (cs *ClusterServer) MemberUpdate(ctx context.Context, r *pb.MemberUpdateRequest) (*pb.MemberUpdateResponse, error) {
m := etcdserver.Member{ m := membership.Member{
ID: types.ID(r.ID), ID: types.ID(r.ID),
RaftAttributes: etcdserver.RaftAttributes{PeerURLs: r.PeerURLs}, RaftAttributes: membership.RaftAttributes{PeerURLs: r.PeerURLs},
} }
err := cs.server.UpdateMember(ctx, m) err := cs.server.UpdateMember(ctx, m)
switch { switch {
case err == etcdserver.ErrPeerURLexists: case err == membership.ErrPeerURLexists:
return nil, rpctypes.ErrPeerURLExist return nil, rpctypes.ErrPeerURLExist
case err == etcdserver.ErrIDNotFound: case err == membership.ErrIDNotFound:
return nil, rpctypes.ErrMemberNotFound return nil, rpctypes.ErrMemberNotFound
case err != nil: case err != nil:
return nil, grpc.Errorf(codes.Internal, err.Error()) return nil, grpc.Errorf(codes.Internal, err.Error())

View File

@ -22,6 +22,7 @@ import (
"sort" "sort"
"time" "time"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/pkg/httputil" "github.com/coreos/etcd/pkg/httputil"
"github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/version" "github.com/coreos/etcd/version"
@ -30,7 +31,7 @@ import (
// isMemberBootstrapped tries to check if the given member has been bootstrapped // isMemberBootstrapped tries to check if the given member has been bootstrapped
// in the given cluster. // in the given cluster.
func isMemberBootstrapped(cl *cluster, member string, rt http.RoundTripper, timeout time.Duration) bool { func isMemberBootstrapped(cl *membership.RaftCluster, member string, rt http.RoundTripper, timeout time.Duration) bool {
rcl, err := getClusterFromRemotePeers(getRemotePeerURLs(cl, member), timeout, false, rt) rcl, err := getClusterFromRemotePeers(getRemotePeerURLs(cl, member), timeout, false, rt)
if err != nil { if err != nil {
return false return false
@ -53,12 +54,12 @@ func isMemberBootstrapped(cl *cluster, member string, rt http.RoundTripper, time
// response, an error is returned. // response, an error is returned.
// Each request has a 10-second timeout. Because the upper limit of TTL is 5s, // Each request has a 10-second timeout. Because the upper limit of TTL is 5s,
// 10 second is enough for building connection and finishing request. // 10 second is enough for building connection and finishing request.
func GetClusterFromRemotePeers(urls []string, rt http.RoundTripper) (*cluster, error) { func GetClusterFromRemotePeers(urls []string, rt http.RoundTripper) (*membership.RaftCluster, error) {
return getClusterFromRemotePeers(urls, 10*time.Second, true, rt) return getClusterFromRemotePeers(urls, 10*time.Second, true, rt)
} }
// If logerr is true, it prints out more error messages. // If logerr is true, it prints out more error messages.
func getClusterFromRemotePeers(urls []string, timeout time.Duration, logerr bool, rt http.RoundTripper) (*cluster, error) { func getClusterFromRemotePeers(urls []string, timeout time.Duration, logerr bool, rt http.RoundTripper) (*membership.RaftCluster, error) {
cc := &http.Client{ cc := &http.Client{
Transport: rt, Transport: rt,
Timeout: timeout, Timeout: timeout,
@ -78,7 +79,7 @@ func getClusterFromRemotePeers(urls []string, timeout time.Duration, logerr bool
} }
continue continue
} }
var membs []*Member var membs []*membership.Member
if err = json.Unmarshal(b, &membs); err != nil { if err = json.Unmarshal(b, &membs); err != nil {
if logerr { if logerr {
plog.Warningf("could not unmarshal cluster response: %v", err) plog.Warningf("could not unmarshal cluster response: %v", err)
@ -92,14 +93,14 @@ func getClusterFromRemotePeers(urls []string, timeout time.Duration, logerr bool
} }
continue continue
} }
return newClusterFromMembers("", id, membs), nil return membership.NewClusterFromMembers("", id, membs), nil
} }
return nil, fmt.Errorf("could not retrieve cluster information from the given urls") return nil, fmt.Errorf("could not retrieve cluster information from the given urls")
} }
// getRemotePeerURLs returns peer urls of remote members in the cluster. The // getRemotePeerURLs returns peer urls of remote members in the cluster. The
// returned list is sorted in ascending lexicographical order. // returned list is sorted in ascending lexicographical order.
func getRemotePeerURLs(cl *cluster, local string) []string { func getRemotePeerURLs(cl *membership.RaftCluster, local string) []string {
us := make([]string, 0) us := make([]string, 0)
for _, m := range cl.Members() { for _, m := range cl.Members() {
if m.Name == local { if m.Name == local {
@ -115,7 +116,7 @@ func getRemotePeerURLs(cl *cluster, local string) []string {
// The key of the returned map is the member's ID. The value of the returned map // The key of the returned map is the member's ID. The value of the returned map
// is the semver versions string, including server and cluster. // is the semver versions string, including server and cluster.
// If it fails to get the version of a member, the key will be nil. // If it fails to get the version of a member, the key will be nil.
func getVersions(cl *cluster, local types.ID, rt http.RoundTripper) map[string]*version.Versions { func getVersions(cl *membership.RaftCluster, local types.ID, rt http.RoundTripper) map[string]*version.Versions {
members := cl.Members() members := cl.Members()
vers := make(map[string]*version.Versions) vers := make(map[string]*version.Versions)
for _, m := range members { for _, m := range members {
@ -173,7 +174,7 @@ func decideClusterVersion(vers map[string]*version.Versions) *semver.Version {
// cluster version in the range of [MinClusterVersion, Version] and no known members has a cluster version // cluster version in the range of [MinClusterVersion, Version] and no known members has a cluster version
// out of the range. // out of the range.
// We set this rule since when the local member joins, another member might be offline. // We set this rule since when the local member joins, another member might be offline.
func isCompatibleWithCluster(cl *cluster, local types.ID, rt http.RoundTripper) bool { func isCompatibleWithCluster(cl *membership.RaftCluster, local types.ID, rt http.RoundTripper) bool {
vers := getVersions(cl, local, rt) vers := getVersions(cl, local, rt)
minV := semver.Must(semver.NewVersion(version.MinClusterVersion)) minV := semver.Must(semver.NewVersion(version.MinClusterVersion))
maxV := semver.Must(semver.NewVersion(version.Version)) maxV := semver.Must(semver.NewVersion(version.Version))
@ -215,7 +216,7 @@ func isCompatibleWithVers(vers map[string]*version.Versions, local types.ID, min
// getVersion returns the Versions of the given member via its // getVersion returns the Versions of the given member via its
// peerURLs. Returns the last error if it fails to get the version. // peerURLs. Returns the last error if it fails to get the version.
func getVersion(m *Member, rt http.RoundTripper) (*version.Versions, error) { func getVersion(m *membership.Member, rt http.RoundTripper) (*version.Versions, error) {
cc := &http.Client{ cc := &http.Client{
Transport: rt, Transport: rt,
} }
@ -255,12 +256,3 @@ func getVersion(m *Member, rt http.RoundTripper) (*version.Versions, error) {
} }
return nil, err return nil, err
} }
func MustDetectDowngrade(cv *semver.Version) {
lv := semver.Must(semver.NewVersion(version.Version))
// only keep major.minor version for comparison against cluster version
lv = &semver.Version{Major: lv.Major, Minor: lv.Minor}
if cv != nil && lv.LessThan(*cv) {
plog.Fatalf("cluster cannot be downgraded (current version: %s is lower than determined cluster version: %s).", version.Version, version.Cluster(cv.String()))
}
}

View File

@ -17,17 +17,11 @@ package etcdserver
import ( import (
"errors" "errors"
"fmt" "fmt"
etcdErr "github.com/coreos/etcd/error"
) )
var ( var (
ErrUnknownMethod = errors.New("etcdserver: unknown method") ErrUnknownMethod = errors.New("etcdserver: unknown method")
ErrStopped = errors.New("etcdserver: server stopped") ErrStopped = errors.New("etcdserver: server stopped")
ErrIDRemoved = errors.New("etcdserver: ID removed")
ErrIDExists = errors.New("etcdserver: ID exists")
ErrIDNotFound = errors.New("etcdserver: ID not found")
ErrPeerURLexists = errors.New("etcdserver: peerURL exists")
ErrCanceled = errors.New("etcdserver: request cancelled") ErrCanceled = errors.New("etcdserver: request cancelled")
ErrTimeout = errors.New("etcdserver: request timed out") ErrTimeout = errors.New("etcdserver: request timed out")
ErrTimeoutDueToLeaderFail = errors.New("etcdserver: request timed out, possibly due to previous leader failure") ErrTimeoutDueToLeaderFail = errors.New("etcdserver: request timed out, possibly due to previous leader failure")
@ -38,11 +32,6 @@ var (
ErrNoSpace = errors.New("etcdserver: no space") ErrNoSpace = errors.New("etcdserver: no space")
) )
func isKeyNotFound(err error) bool {
e, ok := err.(*etcdErr.Error)
return ok && e.ErrorCode == etcdErr.EcodeKeyNotFound
}
type DiscoveryError struct { type DiscoveryError struct {
Op string Op string
Err error Err error

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package etcdserver package membership
import ( import (
"bytes" "bytes"
@ -34,13 +34,8 @@ import (
"github.com/coreos/go-semver/semver" "github.com/coreos/go-semver/semver"
) )
const ( // RaftCluster is a list of Members that belong to the same raft cluster
raftAttributesSuffix = "raftAttributes" type RaftCluster struct {
attributesSuffix = "attributes"
)
// Cluster is a list of Members that belong to the same raft cluster
type cluster struct {
id types.ID id types.ID
token string token string
store store.Store store store.Store
@ -53,8 +48,8 @@ type cluster struct {
removed map[types.ID]bool removed map[types.ID]bool
} }
func newClusterFromURLsMap(token string, urlsmap types.URLsMap) (*cluster, error) { func NewClusterFromURLsMap(token string, urlsmap types.URLsMap) (*RaftCluster, error) {
c := newCluster(token) c := NewCluster(token)
for name, urls := range urlsmap { for name, urls := range urlsmap {
m := NewMember(name, urls, token, nil) m := NewMember(name, urls, token, nil)
if _, ok := c.members[m.ID]; ok { if _, ok := c.members[m.ID]; ok {
@ -69,8 +64,8 @@ func newClusterFromURLsMap(token string, urlsmap types.URLsMap) (*cluster, error
return c, nil return c, nil
} }
func newClusterFromMembers(token string, id types.ID, membs []*Member) *cluster { func NewClusterFromMembers(token string, id types.ID, membs []*Member) *RaftCluster {
c := newCluster(token) c := NewCluster(token)
c.id = id c.id = id
for _, m := range membs { for _, m := range membs {
c.members[m.ID] = m c.members[m.ID] = m
@ -78,17 +73,17 @@ func newClusterFromMembers(token string, id types.ID, membs []*Member) *cluster
return c return c
} }
func newCluster(token string) *cluster { func NewCluster(token string) *RaftCluster {
return &cluster{ return &RaftCluster{
token: token, token: token,
members: make(map[types.ID]*Member), members: make(map[types.ID]*Member),
removed: make(map[types.ID]bool), removed: make(map[types.ID]bool),
} }
} }
func (c *cluster) ID() types.ID { return c.id } func (c *RaftCluster) ID() types.ID { return c.id }
func (c *cluster) Members() []*Member { func (c *RaftCluster) Members() []*Member {
c.Lock() c.Lock()
defer c.Unlock() defer c.Unlock()
var ms MembersByID var ms MembersByID
@ -99,7 +94,7 @@ func (c *cluster) Members() []*Member {
return []*Member(ms) return []*Member(ms)
} }
func (c *cluster) Member(id types.ID) *Member { func (c *RaftCluster) Member(id types.ID) *Member {
c.Lock() c.Lock()
defer c.Unlock() defer c.Unlock()
return c.members[id].Clone() return c.members[id].Clone()
@ -107,7 +102,7 @@ func (c *cluster) Member(id types.ID) *Member {
// MemberByName returns a Member with the given name if exists. // MemberByName returns a Member with the given name if exists.
// If more than one member has the given name, it will panic. // If more than one member has the given name, it will panic.
func (c *cluster) MemberByName(name string) *Member { func (c *RaftCluster) MemberByName(name string) *Member {
c.Lock() c.Lock()
defer c.Unlock() defer c.Unlock()
var memb *Member var memb *Member
@ -122,7 +117,7 @@ func (c *cluster) MemberByName(name string) *Member {
return memb.Clone() return memb.Clone()
} }
func (c *cluster) MemberIDs() []types.ID { func (c *RaftCluster) MemberIDs() []types.ID {
c.Lock() c.Lock()
defer c.Unlock() defer c.Unlock()
var ids []types.ID var ids []types.ID
@ -133,7 +128,7 @@ func (c *cluster) MemberIDs() []types.ID {
return ids return ids
} }
func (c *cluster) IsIDRemoved(id types.ID) bool { func (c *RaftCluster) IsIDRemoved(id types.ID) bool {
c.Lock() c.Lock()
defer c.Unlock() defer c.Unlock()
return c.removed[id] return c.removed[id]
@ -141,7 +136,7 @@ func (c *cluster) IsIDRemoved(id types.ID) bool {
// PeerURLs returns a list of all peer addresses. // PeerURLs returns a list of all peer addresses.
// The returned list is sorted in ascending lexicographical order. // The returned list is sorted in ascending lexicographical order.
func (c *cluster) PeerURLs() []string { func (c *RaftCluster) PeerURLs() []string {
c.Lock() c.Lock()
defer c.Unlock() defer c.Unlock()
urls := make([]string, 0) urls := make([]string, 0)
@ -156,7 +151,7 @@ func (c *cluster) PeerURLs() []string {
// ClientURLs returns a list of all client addresses. // ClientURLs returns a list of all client addresses.
// The returned list is sorted in ascending lexicographical order. // The returned list is sorted in ascending lexicographical order.
func (c *cluster) ClientURLs() []string { func (c *RaftCluster) ClientURLs() []string {
c.Lock() c.Lock()
defer c.Unlock() defer c.Unlock()
urls := make([]string, 0) urls := make([]string, 0)
@ -169,7 +164,7 @@ func (c *cluster) ClientURLs() []string {
return urls return urls
} }
func (c *cluster) String() string { func (c *RaftCluster) String() string {
c.Lock() c.Lock()
defer c.Unlock() defer c.Unlock()
b := &bytes.Buffer{} b := &bytes.Buffer{}
@ -187,7 +182,7 @@ func (c *cluster) String() string {
return b.String() return b.String()
} }
func (c *cluster) genID() { func (c *RaftCluster) genID() {
mIDs := c.MemberIDs() mIDs := c.MemberIDs()
b := make([]byte, 8*len(mIDs)) b := make([]byte, 8*len(mIDs))
for i, id := range mIDs { for i, id := range mIDs {
@ -197,17 +192,17 @@ func (c *cluster) genID() {
c.id = types.ID(binary.BigEndian.Uint64(hash[:8])) c.id = types.ID(binary.BigEndian.Uint64(hash[:8]))
} }
func (c *cluster) SetID(id types.ID) { c.id = id } func (c *RaftCluster) SetID(id types.ID) { c.id = id }
func (c *cluster) SetStore(st store.Store) { c.store = st } func (c *RaftCluster) SetStore(st store.Store) { c.store = st }
func (c *cluster) Recover() { func (c *RaftCluster) Recover() {
c.Lock() c.Lock()
defer c.Unlock() defer c.Unlock()
c.members, c.removed = membersFromStore(c.store) c.members, c.removed = membersFromStore(c.store)
c.version = clusterVersionFromStore(c.store) c.version = clusterVersionFromStore(c.store)
MustDetectDowngrade(c.version) mustDetectDowngrade(c.version)
for _, m := range c.members { for _, m := range c.members {
plog.Infof("added member %s %v to cluster %s from store", m.ID, m.PeerURLs, c.id) plog.Infof("added member %s %v to cluster %s from store", m.ID, m.PeerURLs, c.id)
@ -219,7 +214,7 @@ func (c *cluster) Recover() {
// ValidateConfigurationChange takes a proposed ConfChange and // ValidateConfigurationChange takes a proposed ConfChange and
// ensures that it is still valid. // ensures that it is still valid.
func (c *cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error { func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
members, removed := membersFromStore(c.store) members, removed := membersFromStore(c.store)
id := types.ID(cc.NodeID) id := types.ID(cc.NodeID)
if removed[id] { if removed[id] {
@ -280,36 +275,42 @@ func (c *cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
// AddMember adds a new Member into the cluster, and saves the given member's // AddMember adds a new Member into the cluster, and saves the given member's
// raftAttributes into the store. The given member should have empty attributes. // raftAttributes into the store. The given member should have empty attributes.
// A Member with a matching id must not exist. // A Member with a matching id must not exist.
func (c *cluster) AddMember(m *Member) { func (c *RaftCluster) AddMember(m *Member) {
c.Lock() c.Lock()
defer c.Unlock() defer c.Unlock()
b, err := json.Marshal(m.RaftAttributes) if c.store != nil {
if err != nil { b, err := json.Marshal(m.RaftAttributes)
plog.Panicf("marshal raftAttributes should never fail: %v", err) if err != nil {
} plog.Panicf("marshal raftAttributes should never fail: %v", err)
p := path.Join(memberStoreKey(m.ID), raftAttributesSuffix) }
if _, err := c.store.Create(p, false, string(b), false, store.TTLOptionSet{ExpireTime: store.Permanent}); err != nil { p := path.Join(MemberStoreKey(m.ID), raftAttributesSuffix)
plog.Panicf("create raftAttributes should never fail: %v", err) if _, err := c.store.Create(p, false, string(b), false, store.TTLOptionSet{ExpireTime: store.Permanent}); err != nil {
plog.Panicf("create raftAttributes should never fail: %v", err)
}
} }
c.members[m.ID] = m c.members[m.ID] = m
} }
// RemoveMember removes a member from the store. // RemoveMember removes a member from the store.
// The given id MUST exist, or the function panics. // The given id MUST exist, or the function panics.
func (c *cluster) RemoveMember(id types.ID) { func (c *RaftCluster) RemoveMember(id types.ID) {
c.Lock() c.Lock()
defer c.Unlock() defer c.Unlock()
if _, err := c.store.Delete(memberStoreKey(id), true, true); err != nil { if c.store != nil {
plog.Panicf("delete member should never fail: %v", err) if _, err := c.store.Delete(MemberStoreKey(id), true, true); err != nil {
plog.Panicf("delete member should never fail: %v", err)
}
} }
delete(c.members, id) delete(c.members, id)
if _, err := c.store.Create(removedMemberStoreKey(id), false, "", false, store.TTLOptionSet{ExpireTime: store.Permanent}); err != nil { if c.store != nil {
plog.Panicf("create removedMember should never fail: %v", err) if _, err := c.store.Create(RemovedMemberStoreKey(id), false, "", false, store.TTLOptionSet{ExpireTime: store.Permanent}); err != nil {
plog.Panicf("create removedMember should never fail: %v", err)
}
} }
c.removed[id] = true c.removed[id] = true
} }
func (c *cluster) UpdateAttributes(id types.ID, attr Attributes) bool { func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes) bool {
c.Lock() c.Lock()
defer c.Unlock() defer c.Unlock()
if m, ok := c.members[id]; ok { if m, ok := c.members[id]; ok {
@ -326,21 +327,24 @@ func (c *cluster) UpdateAttributes(id types.ID, attr Attributes) bool {
return false return false
} }
func (c *cluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) { func (c *RaftCluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) {
c.Lock() c.Lock()
defer c.Unlock() defer c.Unlock()
b, err := json.Marshal(raftAttr)
if err != nil { if c.store != nil {
plog.Panicf("marshal raftAttributes should never fail: %v", err) b, err := json.Marshal(raftAttr)
} if err != nil {
p := path.Join(memberStoreKey(id), raftAttributesSuffix) plog.Panicf("marshal raftAttributes should never fail: %v", err)
if _, err := c.store.Update(p, string(b), store.TTLOptionSet{ExpireTime: store.Permanent}); err != nil { }
plog.Panicf("update raftAttributes should never fail: %v", err) p := path.Join(MemberStoreKey(id), raftAttributesSuffix)
if _, err := c.store.Update(p, string(b), store.TTLOptionSet{ExpireTime: store.Permanent}); err != nil {
plog.Panicf("update raftAttributes should never fail: %v", err)
}
} }
c.members[id].RaftAttributes = raftAttr c.members[id].RaftAttributes = raftAttr
} }
func (c *cluster) Version() *semver.Version { func (c *RaftCluster) Version() *semver.Version {
c.Lock() c.Lock()
defer c.Unlock() defer c.Unlock()
if c.version == nil { if c.version == nil {
@ -349,7 +353,7 @@ func (c *cluster) Version() *semver.Version {
return semver.Must(semver.NewVersion(c.version.String())) return semver.Must(semver.NewVersion(c.version.String()))
} }
func (c *cluster) SetVersion(ver *semver.Version) { func (c *RaftCluster) SetVersion(ver *semver.Version) {
c.Lock() c.Lock()
defer c.Unlock() defer c.Unlock()
if c.version != nil { if c.version != nil {
@ -358,10 +362,10 @@ func (c *cluster) SetVersion(ver *semver.Version) {
plog.Noticef("set the initial cluster version to %v", version.Cluster(ver.String())) plog.Noticef("set the initial cluster version to %v", version.Cluster(ver.String()))
} }
c.version = ver c.version = ver
MustDetectDowngrade(c.version) mustDetectDowngrade(c.version)
} }
func (c *cluster) isReadyToAddNewMember() bool { func (c *RaftCluster) IsReadyToAddNewMember() bool {
nmembers := 1 nmembers := 1
nstarted := 0 nstarted := 0
@ -389,7 +393,7 @@ func (c *cluster) isReadyToAddNewMember() bool {
return true return true
} }
func (c *cluster) isReadyToRemoveMember(id uint64) bool { func (c *RaftCluster) IsReadyToRemoveMember(id uint64) bool {
nmembers := 0 nmembers := 0
nstarted := 0 nstarted := 0
@ -416,7 +420,7 @@ func (c *cluster) isReadyToRemoveMember(id uint64) bool {
func membersFromStore(st store.Store) (map[types.ID]*Member, map[types.ID]bool) { func membersFromStore(st store.Store) (map[types.ID]*Member, map[types.ID]bool) {
members := make(map[types.ID]*Member) members := make(map[types.ID]*Member)
removed := make(map[types.ID]bool) removed := make(map[types.ID]bool)
e, err := st.Get(storeMembersPrefix, true, true) e, err := st.Get(StoreMembersPrefix, true, true)
if err != nil { if err != nil {
if isKeyNotFound(err) { if isKeyNotFound(err) {
return members, removed return members, removed
@ -440,13 +444,13 @@ func membersFromStore(st store.Store) (map[types.ID]*Member, map[types.ID]bool)
plog.Panicf("get storeRemovedMembers should never fail: %v", err) plog.Panicf("get storeRemovedMembers should never fail: %v", err)
} }
for _, n := range e.Node.Nodes { for _, n := range e.Node.Nodes {
removed[mustParseMemberIDFromKey(n.Key)] = true removed[MustParseMemberIDFromKey(n.Key)] = true
} }
return members, removed return members, removed
} }
func clusterVersionFromStore(st store.Store) *semver.Version { func clusterVersionFromStore(st store.Store) *semver.Version {
e, err := st.Get(path.Join(StoreClusterPrefix, "version"), false, false) e, err := st.Get(path.Join(storePrefix, "version"), false, false)
if err != nil { if err != nil {
if isKeyNotFound(err) { if isKeyNotFound(err) {
return nil return nil
@ -460,7 +464,7 @@ func clusterVersionFromStore(st store.Store) *semver.Version {
// with the existing cluster. If the validation succeeds, it assigns the IDs // with the existing cluster. If the validation succeeds, it assigns the IDs
// from the existing cluster to the local cluster. // from the existing cluster to the local cluster.
// If the validation fails, an error will be returned. // If the validation fails, an error will be returned.
func ValidateClusterAndAssignIDs(local *cluster, existing *cluster) error { func ValidateClusterAndAssignIDs(local *RaftCluster, existing *RaftCluster) error {
ems := existing.Members() ems := existing.Members()
lms := local.Members() lms := local.Members()
if len(ems) != len(lms) { if len(ems) != len(lms) {
@ -481,3 +485,12 @@ func ValidateClusterAndAssignIDs(local *cluster, existing *cluster) error {
} }
return nil return nil
} }
func mustDetectDowngrade(cv *semver.Version) {
lv := semver.Must(semver.NewVersion(version.Version))
// only keep major.minor version for comparison against cluster version
lv = &semver.Version{Major: lv.Major, Minor: lv.Minor}
if cv != nil && lv.LessThan(*cv) {
plog.Fatalf("cluster cannot be downgraded (current version: %s is lower than determined cluster version: %s).", version.Version, version.Cluster(cv.String()))
}
}

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package etcdserver package membership
import ( import (
"encoding/json" "encoding/json"
@ -274,7 +274,7 @@ func TestClusterValidateAndAssignIDs(t *testing.T) {
} }
func TestClusterValidateConfigurationChange(t *testing.T) { func TestClusterValidateConfigurationChange(t *testing.T) {
cl := newCluster("") cl := NewCluster("")
cl.SetStore(store.New()) cl.SetStore(store.New())
for i := 1; i <= 4; i++ { for i := 1; i <= 4; i++ {
attr := RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", i)}} attr := RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", i)}}
@ -457,7 +457,7 @@ func TestClusterAddMember(t *testing.T) {
{ {
Name: "Create", Name: "Create",
Params: []interface{}{ Params: []interface{}{
path.Join(storeMembersPrefix, "1", "raftAttributes"), path.Join(StoreMembersPrefix, "1", "raftAttributes"),
false, false,
`{"peerURLs":null}`, `{"peerURLs":null}`,
false, false,
@ -471,7 +471,7 @@ func TestClusterAddMember(t *testing.T) {
} }
func TestClusterMembers(t *testing.T) { func TestClusterMembers(t *testing.T) {
cls := &cluster{ cls := &RaftCluster{
members: map[types.ID]*Member{ members: map[types.ID]*Member{
1: {ID: 1}, 1: {ID: 1},
20: {ID: 20}, 20: {ID: 20},
@ -499,8 +499,8 @@ func TestClusterRemoveMember(t *testing.T) {
c.RemoveMember(1) c.RemoveMember(1)
wactions := []testutil.Action{ wactions := []testutil.Action{
{Name: "Delete", Params: []interface{}{memberStoreKey(1), true, true}}, {Name: "Delete", Params: []interface{}{MemberStoreKey(1), true, true}},
{Name: "Create", Params: []interface{}{removedMemberStoreKey(1), false, "", false, store.TTLOptionSet{ExpireTime: store.Permanent}}}, {Name: "Create", Params: []interface{}{RemovedMemberStoreKey(1), false, "", false, store.TTLOptionSet{ExpireTime: store.Permanent}}},
} }
if !reflect.DeepEqual(st.Action(), wactions) { if !reflect.DeepEqual(st.Action(), wactions) {
t.Errorf("actions = %v, want %v", st.Action(), wactions) t.Errorf("actions = %v, want %v", st.Action(), wactions)
@ -558,8 +558,8 @@ func TestNodeToMember(t *testing.T) {
} }
} }
func newTestCluster(membs []*Member) *cluster { func newTestCluster(membs []*Member) *RaftCluster {
c := &cluster{members: make(map[types.ID]*Member), removed: make(map[types.ID]bool)} c := &RaftCluster{members: make(map[types.ID]*Member), removed: make(map[types.ID]bool)}
for _, m := range membs { for _, m := range membs {
c.members[m.ID] = m c.members[m.ID] = m
} }
@ -642,7 +642,7 @@ func TestIsReadyToAddNewMember(t *testing.T) {
} }
for i, tt := range tests { for i, tt := range tests {
c := newTestCluster(tt.members) c := newTestCluster(tt.members)
if got := c.isReadyToAddNewMember(); got != tt.want { if got := c.IsReadyToAddNewMember(); got != tt.want {
t.Errorf("%d: isReadyToAddNewMember returned %t, want %t", i, got, tt.want) t.Errorf("%d: isReadyToAddNewMember returned %t, want %t", i, got, tt.want)
} }
} }
@ -727,7 +727,7 @@ func TestIsReadyToRemoveMember(t *testing.T) {
} }
for i, tt := range tests { for i, tt := range tests {
c := newTestCluster(tt.members) c := newTestCluster(tt.members)
if got := c.isReadyToRemoveMember(tt.removeID); got != tt.want { if got := c.IsReadyToRemoveMember(tt.removeID); got != tt.want {
t.Errorf("%d: isReadyToAddNewMember returned %t, want %t", i, got, tt.want) t.Errorf("%d: isReadyToAddNewMember returned %t, want %t", i, got, tt.want)
} }
} }

View File

@ -0,0 +1,33 @@
// Copyright 2016 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package membership
import (
"errors"
etcdErr "github.com/coreos/etcd/error"
)
var (
ErrIDRemoved = errors.New("membership: ID removed")
ErrIDExists = errors.New("membership: ID exists")
ErrIDNotFound = errors.New("membership: ID not found")
ErrPeerURLexists = errors.New("membership: peerURL exists")
)
func isKeyNotFound(err error) bool {
e, ok := err.(*etcdErr.Error)
return ok && e.ErrorCode == etcdErr.EcodeKeyNotFound
}

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package etcdserver package membership
import ( import (
"crypto/sha1" "crypto/sha1"
@ -26,11 +26,24 @@ import (
"github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/store" "github.com/coreos/etcd/store"
"github.com/coreos/pkg/capnslog"
) )
var ( var (
storeMembersPrefix = path.Join(StoreClusterPrefix, "members") plog = capnslog.NewPackageLogger("github.com/coreos/etcd/etcdserver", "membership")
storeRemovedMembersPrefix = path.Join(StoreClusterPrefix, "removed_members")
StoreMembersPrefix = path.Join(storePrefix, "members")
storeRemovedMembersPrefix = path.Join(storePrefix, "removed_members")
)
const (
// TODO: make this private after moving all membership storage logic
// from etcdserver pkg
AttributesSuffix = "attributes"
raftAttributesSuffix = "raftAttributes"
// the prefix for stroing membership related information in store provided by store pkg.
storePrefix = "/0"
) )
// RaftAttributes represents the raft related attributes of an etcd member. // RaftAttributes represents the raft related attributes of an etcd member.
@ -110,15 +123,15 @@ func (m *Member) IsStarted() bool {
return len(m.Name) != 0 return len(m.Name) != 0
} }
func memberStoreKey(id types.ID) string { func MemberStoreKey(id types.ID) string {
return path.Join(storeMembersPrefix, id.String()) return path.Join(StoreMembersPrefix, id.String())
} }
func MemberAttributesStorePath(id types.ID) string { func MemberAttributesStorePath(id types.ID) string {
return path.Join(memberStoreKey(id), attributesSuffix) return path.Join(MemberStoreKey(id), AttributesSuffix)
} }
func mustParseMemberIDFromKey(key string) types.ID { func MustParseMemberIDFromKey(key string) types.ID {
id, err := types.IDFromString(path.Base(key)) id, err := types.IDFromString(path.Base(key))
if err != nil { if err != nil {
plog.Panicf("unexpected parse member id error: %v", err) plog.Panicf("unexpected parse member id error: %v", err)
@ -126,17 +139,17 @@ func mustParseMemberIDFromKey(key string) types.ID {
return id return id
} }
func removedMemberStoreKey(id types.ID) string { func RemovedMemberStoreKey(id types.ID) string {
return path.Join(storeRemovedMembersPrefix, id.String()) return path.Join(storeRemovedMembersPrefix, id.String())
} }
// nodeToMember builds member from a key value node. // NodeToMember builds member from a key value node.
// the child nodes of the given node MUST be sorted by key. // the child nodes of the given node MUST be sorted by key.
func nodeToMember(n *store.NodeExtern) (*Member, error) { func nodeToMember(n *store.NodeExtern) (*Member, error) {
m := &Member{ID: mustParseMemberIDFromKey(n.Key)} m := &Member{ID: MustParseMemberIDFromKey(n.Key)}
attrs := make(map[string][]byte) attrs := make(map[string][]byte)
raftAttrKey := path.Join(n.Key, raftAttributesSuffix) raftAttrKey := path.Join(n.Key, raftAttributesSuffix)
attrKey := path.Join(n.Key, attributesSuffix) attrKey := path.Join(n.Key, AttributesSuffix)
for _, nn := range n.Nodes { for _, nn := range n.Nodes {
if nn.Key != raftAttrKey && nn.Key != attrKey { if nn.Key != raftAttrKey && nn.Key != attrKey {
return nil, fmt.Errorf("unknown key %q", nn.Key) return nil, fmt.Errorf("unknown key %q", nn.Key)

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package etcdserver package membership
import ( import (
"net/url" "net/url"

View File

@ -24,6 +24,7 @@ import (
"time" "time"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb" pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/pkg/contention" "github.com/coreos/etcd/pkg/contention"
"github.com/coreos/etcd/pkg/pbutil" "github.com/coreos/etcd/pkg/pbutil"
"github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/pkg/types"
@ -279,7 +280,7 @@ func advanceTicksForElection(n raft.Node, electionTicks int) {
} }
} }
func startNode(cfg *ServerConfig, cl *cluster, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) { func startNode(cfg *ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) {
var err error var err error
member := cl.MemberByName(cfg.Name) member := cl.MemberByName(cfg.Name)
metadata := pbutil.MustMarshal( metadata := pbutil.MustMarshal(
@ -323,7 +324,7 @@ func startNode(cfg *ServerConfig, cl *cluster, ids []types.ID) (id types.ID, n r
return return
} }
func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *cluster, raft.Node, *raft.MemoryStorage, *wal.WAL) { func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
var walsnap walpb.Snapshot var walsnap walpb.Snapshot
if snapshot != nil { if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
@ -331,7 +332,7 @@ func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *clust
w, id, cid, st, ents := readWAL(cfg.WALDir(), walsnap) w, id, cid, st, ents := readWAL(cfg.WALDir(), walsnap)
plog.Infof("restarting member %s in cluster %s at commit index %d", id, cid, st.Commit) plog.Infof("restarting member %s in cluster %s at commit index %d", id, cid, st.Commit)
cl := newCluster("") cl := membership.NewCluster("")
cl.SetID(cid) cl.SetID(cid)
s := raft.NewMemoryStorage() s := raft.NewMemoryStorage()
if snapshot != nil { if snapshot != nil {
@ -357,7 +358,7 @@ func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *clust
return id, cl, n, s, w return id, cl, n, s, w
} }
func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *cluster, raft.Node, *raft.MemoryStorage, *wal.WAL) { func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
var walsnap walpb.Snapshot var walsnap walpb.Snapshot
if snapshot != nil { if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
@ -387,7 +388,7 @@ func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (type
} }
plog.Printf("forcing restart of member %s in cluster %s at commit index %d", id, cid, st.Commit) plog.Printf("forcing restart of member %s in cluster %s at commit index %d", id, cid, st.Commit)
cl := newCluster("") cl := membership.NewCluster("")
cl.SetID(cid) cl.SetID(cid)
s := raft.NewMemoryStorage() s := raft.NewMemoryStorage()
if snapshot != nil { if snapshot != nil {
@ -473,9 +474,9 @@ func createConfigChangeEnts(ids []uint64, self uint64, term, index uint64) []raf
next++ next++
} }
if !found { if !found {
m := Member{ m := membership.Member{
ID: types.ID(self), ID: types.ID(self),
RaftAttributes: RaftAttributes{PeerURLs: []string{"http://localhost:7001", "http://localhost:2380"}}, RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"http://localhost:7001", "http://localhost:2380"}},
} }
ctx, err := json.Marshal(m) ctx, err := json.Marshal(m)
if err != nil { if err != nil {

View File

@ -20,6 +20,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/pkg/mock/mockstorage" "github.com/coreos/etcd/pkg/mock/mockstorage"
"github.com/coreos/etcd/pkg/pbutil" "github.com/coreos/etcd/pkg/pbutil"
"github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/pkg/types"
@ -71,9 +72,9 @@ func TestGetIDs(t *testing.T) {
} }
func TestCreateConfigChangeEnts(t *testing.T) { func TestCreateConfigChangeEnts(t *testing.T) {
m := Member{ m := membership.Member{
ID: types.ID(1), ID: types.ID(1),
RaftAttributes: RaftAttributes{PeerURLs: []string{"http://localhost:7001", "http://localhost:2380"}}, RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"http://localhost:7001", "http://localhost:2380"}},
} }
ctx, err := json.Marshal(m) ctx, err := json.Marshal(m)
if err != nil { if err != nil {

View File

@ -33,6 +33,7 @@ import (
"github.com/coreos/etcd/discovery" "github.com/coreos/etcd/discovery"
"github.com/coreos/etcd/etcdserver/api/v2http/httptypes" "github.com/coreos/etcd/etcdserver/api/v2http/httptypes"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb" pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/etcdserver/stats" "github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/lease" "github.com/coreos/etcd/lease"
"github.com/coreos/etcd/pkg/fileutil" "github.com/coreos/etcd/pkg/fileutil"
@ -82,7 +83,7 @@ const (
var ( var (
plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "etcdserver") plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "etcdserver")
storeMemberAttributeRegexp = regexp.MustCompile(path.Join(storeMembersPrefix, "[[:xdigit:]]{1,16}", attributesSuffix)) storeMemberAttributeRegexp = regexp.MustCompile(path.Join(membership.StoreMembersPrefix, "[[:xdigit:]]{1,16}", "attributes"))
) )
func init() { func init() {
@ -126,7 +127,7 @@ type Server interface {
// AddMember attempts to add a member into the cluster. It will return // AddMember attempts to add a member into the cluster. It will return
// ErrIDRemoved if member ID is removed from the cluster, or return // ErrIDRemoved if member ID is removed from the cluster, or return
// ErrIDExists if member ID exists in the cluster. // ErrIDExists if member ID exists in the cluster.
AddMember(ctx context.Context, memb Member) error AddMember(ctx context.Context, memb membership.Member) error
// RemoveMember attempts to remove a member from the cluster. It will // RemoveMember attempts to remove a member from the cluster. It will
// return ErrIDRemoved if member ID is removed from the cluster, or return // return ErrIDRemoved if member ID is removed from the cluster, or return
// ErrIDNotFound if member ID is not in the cluster. // ErrIDNotFound if member ID is not in the cluster.
@ -134,7 +135,7 @@ type Server interface {
// UpdateMember attempts to update an existing member in the cluster. It will // UpdateMember attempts to update an existing member in the cluster. It will
// return ErrIDNotFound if the member ID does not exist. // return ErrIDNotFound if the member ID does not exist.
UpdateMember(ctx context.Context, updateMemb Member) error UpdateMember(ctx context.Context, updateMemb membership.Member) error
// ClusterVersion is the cluster-wide minimum major.minor version. // ClusterVersion is the cluster-wide minimum major.minor version.
// Cluster version is set to the min version that an etcd member is // Cluster version is set to the min version that an etcd member is
@ -167,9 +168,9 @@ type EtcdServer struct {
done chan struct{} done chan struct{}
errorc chan error errorc chan error
id types.ID id types.ID
attributes Attributes attributes membership.Attributes
cluster *cluster cluster *membership.RaftCluster
store store.Store store store.Store
@ -216,7 +217,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
n raft.Node n raft.Node
s *raft.MemoryStorage s *raft.MemoryStorage
id types.ID id types.ID
cl *cluster cl *membership.RaftCluster
) )
if terr := fileutil.TouchDirAll(cfg.DataDir); terr != nil { if terr := fileutil.TouchDirAll(cfg.DataDir); terr != nil {
@ -239,13 +240,13 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
var remotes []*Member var remotes []*membership.Member
switch { switch {
case !haveWAL && !cfg.NewCluster: case !haveWAL && !cfg.NewCluster:
if err := cfg.VerifyJoinExisting(); err != nil { if err := cfg.VerifyJoinExisting(); err != nil {
return nil, err return nil, err
} }
cl, err = newClusterFromURLsMap(cfg.InitialClusterToken, cfg.InitialPeerURLsMap) cl, err = membership.NewClusterFromURLsMap(cfg.InitialClusterToken, cfg.InitialPeerURLsMap)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -253,7 +254,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot fetch cluster info from peer urls: %v", err) return nil, fmt.Errorf("cannot fetch cluster info from peer urls: %v", err)
} }
if err := ValidateClusterAndAssignIDs(cl, existingCluster); err != nil { if err := membership.ValidateClusterAndAssignIDs(cl, existingCluster); err != nil {
return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err) return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err)
} }
if !isCompatibleWithCluster(cl, cl.MemberByName(cfg.Name).ID, prt) { if !isCompatibleWithCluster(cl, cl.MemberByName(cfg.Name).ID, prt) {
@ -261,7 +262,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
} }
remotes = existingCluster.Members() remotes = existingCluster.Members()
cl.SetID(existingCluster.id) cl.SetID(existingCluster.ID())
cl.SetStore(st) cl.SetStore(st)
cfg.Print() cfg.Print()
id, n, s, w = startNode(cfg, cl, nil) id, n, s, w = startNode(cfg, cl, nil)
@ -269,7 +270,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
if err := cfg.VerifyBootstrap(); err != nil { if err := cfg.VerifyBootstrap(); err != nil {
return nil, err return nil, err
} }
cl, err = newClusterFromURLsMap(cfg.InitialClusterToken, cfg.InitialPeerURLsMap) cl, err = membership.NewClusterFromURLsMap(cfg.InitialClusterToken, cfg.InitialPeerURLsMap)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -291,7 +292,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
if checkDuplicateURL(urlsmap) { if checkDuplicateURL(urlsmap) {
return nil, fmt.Errorf("discovery cluster %s has duplicate url", urlsmap) return nil, fmt.Errorf("discovery cluster %s has duplicate url", urlsmap)
} }
if cl, err = newClusterFromURLsMap(cfg.InitialClusterToken, urlsmap); err != nil { if cl, err = membership.NewClusterFromURLsMap(cfg.InitialClusterToken, urlsmap); err != nil {
return nil, err return nil, err
} }
} }
@ -357,7 +358,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
storage: NewStorage(w, ss), storage: NewStorage(w, ss),
}, },
id: id, id: id,
attributes: Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
cluster: cl, cluster: cl,
stats: sstats, stats: sstats,
lstats: lstats, lstats: lstats,
@ -466,7 +467,7 @@ func (s *EtcdServer) purgeFile() {
func (s *EtcdServer) ID() types.ID { return s.id } func (s *EtcdServer) ID() types.ID { return s.id }
func (s *EtcdServer) Cluster() *cluster { return s.cluster } func (s *EtcdServer) Cluster() *membership.RaftCluster { return s.cluster }
func (s *EtcdServer) RaftHandler() http.Handler { return s.r.transport.Handler() } func (s *EtcdServer) RaftHandler() http.Handler { return s.r.transport.Handler() }
@ -792,8 +793,8 @@ func (s *EtcdServer) LeaderStats() []byte {
func (s *EtcdServer) StoreStats() []byte { return s.store.JsonStats() } func (s *EtcdServer) StoreStats() []byte { return s.store.JsonStats() }
func (s *EtcdServer) AddMember(ctx context.Context, memb Member) error { func (s *EtcdServer) AddMember(ctx context.Context, memb membership.Member) error {
if s.cfg.StrictReconfigCheck && !s.cluster.isReadyToAddNewMember() { if s.cfg.StrictReconfigCheck && !s.cluster.IsReadyToAddNewMember() {
// If s.cfg.StrictReconfigCheck is false, it means the option --strict-reconfig-check isn't passed to etcd. // If s.cfg.StrictReconfigCheck is false, it means the option --strict-reconfig-check isn't passed to etcd.
// In such a case adding a new member is allowed unconditionally // In such a case adding a new member is allowed unconditionally
return ErrNotEnoughStartedMembers return ErrNotEnoughStartedMembers
@ -813,7 +814,7 @@ func (s *EtcdServer) AddMember(ctx context.Context, memb Member) error {
} }
func (s *EtcdServer) RemoveMember(ctx context.Context, id uint64) error { func (s *EtcdServer) RemoveMember(ctx context.Context, id uint64) error {
if s.cfg.StrictReconfigCheck && !s.cluster.isReadyToRemoveMember(id) { if s.cfg.StrictReconfigCheck && !s.cluster.IsReadyToRemoveMember(id) {
// If s.cfg.StrictReconfigCheck is false, it means the option --strict-reconfig-check isn't passed to etcd. // If s.cfg.StrictReconfigCheck is false, it means the option --strict-reconfig-check isn't passed to etcd.
// In such a case removing a member is allowed unconditionally // In such a case removing a member is allowed unconditionally
return ErrNotEnoughStartedMembers return ErrNotEnoughStartedMembers
@ -826,7 +827,7 @@ func (s *EtcdServer) RemoveMember(ctx context.Context, id uint64) error {
return s.configure(ctx, cc) return s.configure(ctx, cc)
} }
func (s *EtcdServer) UpdateMember(ctx context.Context, memb Member) error { func (s *EtcdServer) UpdateMember(ctx context.Context, memb membership.Member) error {
b, err := json.Marshal(memb) b, err := json.Marshal(memb)
if err != nil { if err != nil {
return err return err
@ -914,7 +915,7 @@ func (s *EtcdServer) publish(timeout time.Duration) {
} }
req := pb.Request{ req := pb.Request{
Method: "PUT", Method: "PUT",
Path: MemberAttributesStorePath(s.id), Path: membership.MemberAttributesStorePath(s.id),
Val: string(b), Val: string(b),
} }
@ -1093,8 +1094,8 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response {
// TODO (yicheng): cluster should be the owner of cluster prefix store // TODO (yicheng): cluster should be the owner of cluster prefix store
// we should not modify cluster store here. // we should not modify cluster store here.
if storeMemberAttributeRegexp.MatchString(r.Path) { if storeMemberAttributeRegexp.MatchString(r.Path) {
id := mustParseMemberIDFromKey(path.Dir(r.Path)) id := membership.MustParseMemberIDFromKey(path.Dir(r.Path))
var attr Attributes var attr membership.Attributes
if err := json.Unmarshal([]byte(r.Val), &attr); err != nil { if err := json.Unmarshal([]byte(r.Val), &attr); err != nil {
plog.Panicf("unmarshal %s should never fail: %v", r.Val, err) plog.Panicf("unmarshal %s should never fail: %v", r.Val, err)
} }
@ -1137,7 +1138,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
*confState = *s.r.ApplyConfChange(cc) *confState = *s.r.ApplyConfChange(cc)
switch cc.Type { switch cc.Type {
case raftpb.ConfChangeAddNode: case raftpb.ConfChangeAddNode:
m := new(Member) m := new(membership.Member)
if err := json.Unmarshal(cc.Context, m); err != nil { if err := json.Unmarshal(cc.Context, m); err != nil {
plog.Panicf("unmarshal member should never fail: %v", err) plog.Panicf("unmarshal member should never fail: %v", err)
} }
@ -1161,7 +1162,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
plog.Noticef("removed member %s from cluster %s", id, s.cluster.ID()) plog.Noticef("removed member %s from cluster %s", id, s.cluster.ID())
} }
case raftpb.ConfChangeUpdateNode: case raftpb.ConfChangeUpdateNode:
m := new(Member) m := new(membership.Member)
if err := json.Unmarshal(cc.Context, m); err != nil { if err := json.Unmarshal(cc.Context, m); err != nil {
plog.Panicf("unmarshal member should never fail: %v", err) plog.Panicf("unmarshal member should never fail: %v", err)
} }

View File

@ -26,6 +26,7 @@ import (
"time" "time"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb" pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/lease" "github.com/coreos/etcd/lease"
"github.com/coreos/etcd/pkg/idutil" "github.com/coreos/etcd/pkg/idutil"
"github.com/coreos/etcd/pkg/mock/mockstorage" "github.com/coreos/etcd/pkg/mock/mockstorage"
@ -166,7 +167,7 @@ func TestApplyRepeat(t *testing.T) {
cl := newTestCluster(nil) cl := newTestCluster(nil)
st := store.New() st := store.New()
cl.SetStore(store.New()) cl.SetStore(store.New())
cl.AddMember(&Member{ID: 1234}) cl.AddMember(&membership.Member{ID: 1234})
s := &EtcdServer{ s := &EtcdServer{
r: raftNode{ r: raftNode{
Node: n, Node: n,
@ -457,7 +458,7 @@ func TestApplyRequest(t *testing.T) {
} }
func TestApplyRequestOnAdminMemberAttributes(t *testing.T) { func TestApplyRequestOnAdminMemberAttributes(t *testing.T) {
cl := newTestCluster([]*Member{{ID: 1}}) cl := newTestCluster([]*membership.Member{{ID: 1}})
srv := &EtcdServer{ srv := &EtcdServer{
store: mockstore.NewRecorder(), store: mockstore.NewRecorder(),
cluster: cl, cluster: cl,
@ -465,21 +466,21 @@ func TestApplyRequestOnAdminMemberAttributes(t *testing.T) {
req := pb.Request{ req := pb.Request{
Method: "PUT", Method: "PUT",
ID: 1, ID: 1,
Path: path.Join(storeMembersPrefix, strconv.FormatUint(1, 16), attributesSuffix), Path: path.Join(membership.StoreMembersPrefix, strconv.FormatUint(1, 16), membership.AttributesSuffix),
Val: `{"Name":"abc","ClientURLs":["http://127.0.0.1:2379"]}`, Val: `{"Name":"abc","ClientURLs":["http://127.0.0.1:2379"]}`,
} }
srv.applyRequest(req) srv.applyRequest(req)
w := Attributes{Name: "abc", ClientURLs: []string{"http://127.0.0.1:2379"}} w := membership.Attributes{Name: "abc", ClientURLs: []string{"http://127.0.0.1:2379"}}
if g := cl.Member(1).Attributes; !reflect.DeepEqual(g, w) { if g := cl.Member(1).Attributes; !reflect.DeepEqual(g, w) {
t.Errorf("attributes = %v, want %v", g, w) t.Errorf("attributes = %v, want %v", g, w)
} }
} }
func TestApplyConfChangeError(t *testing.T) { func TestApplyConfChangeError(t *testing.T) {
cl := newCluster("") cl := membership.NewCluster("")
cl.SetStore(store.New()) cl.SetStore(store.New())
for i := 1; i <= 4; i++ { for i := 1; i <= 4; i++ {
cl.AddMember(&Member{ID: types.ID(i)}) cl.AddMember(&membership.Member{ID: types.ID(i)})
} }
cl.RemoveMember(4) cl.RemoveMember(4)
@ -492,28 +493,28 @@ func TestApplyConfChangeError(t *testing.T) {
Type: raftpb.ConfChangeAddNode, Type: raftpb.ConfChangeAddNode,
NodeID: 4, NodeID: 4,
}, },
ErrIDRemoved, membership.ErrIDRemoved,
}, },
{ {
raftpb.ConfChange{ raftpb.ConfChange{
Type: raftpb.ConfChangeUpdateNode, Type: raftpb.ConfChangeUpdateNode,
NodeID: 4, NodeID: 4,
}, },
ErrIDRemoved, membership.ErrIDRemoved,
}, },
{ {
raftpb.ConfChange{ raftpb.ConfChange{
Type: raftpb.ConfChangeAddNode, Type: raftpb.ConfChangeAddNode,
NodeID: 1, NodeID: 1,
}, },
ErrIDExists, membership.ErrIDExists,
}, },
{ {
raftpb.ConfChange{ raftpb.ConfChange{
Type: raftpb.ConfChangeRemoveNode, Type: raftpb.ConfChangeRemoveNode,
NodeID: 5, NodeID: 5,
}, },
ErrIDNotFound, membership.ErrIDNotFound,
}, },
} }
for i, tt := range tests { for i, tt := range tests {
@ -541,10 +542,10 @@ func TestApplyConfChangeError(t *testing.T) {
} }
func TestApplyConfChangeShouldStop(t *testing.T) { func TestApplyConfChangeShouldStop(t *testing.T) {
cl := newCluster("") cl := membership.NewCluster("")
cl.SetStore(store.New()) cl.SetStore(store.New())
for i := 1; i <= 3; i++ { for i := 1; i <= 3; i++ {
cl.AddMember(&Member{ID: types.ID(i)}) cl.AddMember(&membership.Member{ID: types.ID(i)})
} }
srv := &EtcdServer{ srv := &EtcdServer{
id: 1, id: 1,
@ -581,10 +582,10 @@ func TestApplyConfChangeShouldStop(t *testing.T) {
// TestApplyMultiConfChangeShouldStop ensures that apply will return shouldStop // TestApplyMultiConfChangeShouldStop ensures that apply will return shouldStop
// if the local member is removed along with other conf updates. // if the local member is removed along with other conf updates.
func TestApplyMultiConfChangeShouldStop(t *testing.T) { func TestApplyMultiConfChangeShouldStop(t *testing.T) {
cl := newCluster("") cl := membership.NewCluster("")
cl.SetStore(store.New()) cl.SetStore(store.New())
for i := 1; i <= 5; i++ { for i := 1; i <= 5; i++ {
cl.AddMember(&Member{ID: types.ID(i)}) cl.AddMember(&membership.Member{ID: types.ID(i)})
} }
srv := &EtcdServer{ srv := &EtcdServer{
id: 2, id: 2,
@ -922,8 +923,9 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
maxInFlightMsgSnap = 16 maxInFlightMsgSnap = 16
) )
n := newNopReadyNode() n := newNopReadyNode()
cl := newCluster("abc") st := store.New()
cl.SetStore(store.New()) cl := membership.NewCluster("abc")
cl.SetStore(st)
testdir, err := ioutil.TempDir(os.TempDir(), "testsnapdir") testdir, err := ioutil.TempDir(os.TempDir(), "testsnapdir")
if err != nil { if err != nil {
@ -946,7 +948,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
storage: mockstorage.NewStorageRecorder(testdir), storage: mockstorage.NewStorageRecorder(testdir),
raftStorage: rs, raftStorage: rs,
}, },
store: cl.store, store: st,
cluster: cl, cluster: cl,
msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap), msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap),
} }
@ -1032,7 +1034,7 @@ func TestAddMember(t *testing.T) {
reqIDGen: idutil.NewGenerator(0, time.Time{}), reqIDGen: idutil.NewGenerator(0, time.Time{}),
} }
s.start() s.start()
m := Member{ID: 1234, RaftAttributes: RaftAttributes{PeerURLs: []string{"foo"}}} m := membership.Member{ID: 1234, RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"foo"}}}
err := s.AddMember(context.TODO(), m) err := s.AddMember(context.TODO(), m)
gaction := n.Action() gaction := n.Action()
s.Stop() s.Stop()
@ -1058,7 +1060,7 @@ func TestRemoveMember(t *testing.T) {
cl := newTestCluster(nil) cl := newTestCluster(nil)
st := store.New() st := store.New()
cl.SetStore(store.New()) cl.SetStore(store.New())
cl.AddMember(&Member{ID: 1234}) cl.AddMember(&membership.Member{ID: 1234})
s := &EtcdServer{ s := &EtcdServer{
r: raftNode{ r: raftNode{
Node: n, Node: n,
@ -1097,7 +1099,7 @@ func TestUpdateMember(t *testing.T) {
cl := newTestCluster(nil) cl := newTestCluster(nil)
st := store.New() st := store.New()
cl.SetStore(st) cl.SetStore(st)
cl.AddMember(&Member{ID: 1234}) cl.AddMember(&membership.Member{ID: 1234})
s := &EtcdServer{ s := &EtcdServer{
r: raftNode{ r: raftNode{
Node: n, Node: n,
@ -1110,7 +1112,7 @@ func TestUpdateMember(t *testing.T) {
reqIDGen: idutil.NewGenerator(0, time.Time{}), reqIDGen: idutil.NewGenerator(0, time.Time{}),
} }
s.start() s.start()
wm := Member{ID: 1234, RaftAttributes: RaftAttributes{PeerURLs: []string{"http://127.0.0.1:1"}}} wm := membership.Member{ID: 1234, RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"http://127.0.0.1:1"}}}
err := s.UpdateMember(context.TODO(), wm) err := s.UpdateMember(context.TODO(), wm)
gaction := n.Action() gaction := n.Action()
s.Stop() s.Stop()
@ -1139,8 +1141,8 @@ func TestPublish(t *testing.T) {
cfg: &ServerConfig{TickMs: 1}, cfg: &ServerConfig{TickMs: 1},
id: 1, id: 1,
r: raftNode{Node: n}, r: raftNode{Node: n},
attributes: Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}}, attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}},
cluster: &cluster{}, cluster: &membership.RaftCluster{},
w: w, w: w,
reqIDGen: idutil.NewGenerator(0, time.Time{}), reqIDGen: idutil.NewGenerator(0, time.Time{}),
} }
@ -1161,11 +1163,11 @@ func TestPublish(t *testing.T) {
if r.Method != "PUT" { if r.Method != "PUT" {
t.Errorf("method = %s, want PUT", r.Method) t.Errorf("method = %s, want PUT", r.Method)
} }
wm := Member{ID: 1, Attributes: Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}}} wm := membership.Member{ID: 1, Attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}}}
if wpath := path.Join(memberStoreKey(wm.ID), attributesSuffix); r.Path != wpath { if wpath := membership.MemberAttributesStorePath(wm.ID); r.Path != wpath {
t.Errorf("path = %s, want %s", r.Path, wpath) t.Errorf("path = %s, want %s", r.Path, wpath)
} }
var gattr Attributes var gattr membership.Attributes
if err := json.Unmarshal([]byte(r.Val), &gattr); err != nil { if err := json.Unmarshal([]byte(r.Val), &gattr); err != nil {
t.Fatalf("unmarshal val error: %v", err) t.Fatalf("unmarshal val error: %v", err)
} }
@ -1182,7 +1184,7 @@ func TestPublishStopped(t *testing.T) {
Node: newNodeNop(), Node: newNodeNop(),
transport: rafthttp.NewNopTransporter(), transport: rafthttp.NewNopTransporter(),
}, },
cluster: &cluster{}, cluster: &membership.RaftCluster{},
w: mockwait.NewNop(), w: mockwait.NewNop(),
done: make(chan struct{}), done: make(chan struct{}),
stop: make(chan struct{}), stop: make(chan struct{}),
@ -1223,8 +1225,8 @@ func TestUpdateVersion(t *testing.T) {
id: 1, id: 1,
cfg: &ServerConfig{TickMs: 1}, cfg: &ServerConfig{TickMs: 1},
r: raftNode{Node: n}, r: raftNode{Node: n},
attributes: Attributes{Name: "node1", ClientURLs: []string{"http://node1.com"}}, attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://node1.com"}},
cluster: &cluster{}, cluster: &membership.RaftCluster{},
w: w, w: w,
reqIDGen: idutil.NewGenerator(0, time.Time{}), reqIDGen: idutil.NewGenerator(0, time.Time{}),
} }
@ -1279,39 +1281,36 @@ func TestStopNotify(t *testing.T) {
func TestGetOtherPeerURLs(t *testing.T) { func TestGetOtherPeerURLs(t *testing.T) {
tests := []struct { tests := []struct {
membs []*Member membs []*membership.Member
self string
wurls []string wurls []string
}{ }{
{ {
[]*Member{ []*membership.Member{
newTestMember(1, []string{"http://10.0.0.1"}, "a", nil), membership.NewMember("1", types.MustNewURLs([]string{"http://10.0.0.1:1"}), "a", nil),
}, },
"a",
[]string{}, []string{},
}, },
{ {
[]*Member{ []*membership.Member{
newTestMember(1, []string{"http://10.0.0.1"}, "a", nil), membership.NewMember("1", types.MustNewURLs([]string{"http://10.0.0.1:1"}), "a", nil),
newTestMember(2, []string{"http://10.0.0.2"}, "b", nil), membership.NewMember("2", types.MustNewURLs([]string{"http://10.0.0.2:2"}), "a", nil),
newTestMember(3, []string{"http://10.0.0.3"}, "c", nil), membership.NewMember("3", types.MustNewURLs([]string{"http://10.0.0.3:3"}), "a", nil),
}, },
"a", []string{"http://10.0.0.2:2", "http://10.0.0.3:3"},
[]string{"http://10.0.0.2", "http://10.0.0.3"},
}, },
{ {
[]*Member{ []*membership.Member{
newTestMember(1, []string{"http://10.0.0.1"}, "a", nil), membership.NewMember("1", types.MustNewURLs([]string{"http://10.0.0.1:1"}), "a", nil),
newTestMember(3, []string{"http://10.0.0.3"}, "c", nil), membership.NewMember("3", types.MustNewURLs([]string{"http://10.0.0.3:3"}), "a", nil),
newTestMember(2, []string{"http://10.0.0.2"}, "b", nil), membership.NewMember("2", types.MustNewURLs([]string{"http://10.0.0.2:2"}), "a", nil),
}, },
"a", []string{"http://10.0.0.2:2", "http://10.0.0.3:3"},
[]string{"http://10.0.0.2", "http://10.0.0.3"},
}, },
} }
for i, tt := range tests { for i, tt := range tests {
cl := newClusterFromMembers("", types.ID(0), tt.membs) cl := membership.NewClusterFromMembers("", types.ID(0), tt.membs)
urls := getRemotePeerURLs(cl, tt.self) self := "1"
urls := getRemotePeerURLs(cl, self)
if !reflect.DeepEqual(urls, tt.wurls) { if !reflect.DeepEqual(urls, tt.wurls) {
t.Errorf("#%d: urls = %+v, want %+v", i, urls, tt.wurls) t.Errorf("#%d: urls = %+v, want %+v", i, urls, tt.wurls)
} }
@ -1440,3 +1439,11 @@ func (n *nodeCommitter) Propose(ctx context.Context, data []byte) error {
} }
return nil return nil
} }
func newTestCluster(membs []*membership.Member) *membership.RaftCluster {
c := membership.NewCluster("")
for _, m := range membs {
c.AddMember(m)
}
return c
}

View File

@ -17,13 +17,14 @@ package etcdserver
import ( import (
"time" "time"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/rafthttp" "github.com/coreos/etcd/rafthttp"
) )
// isConnectedToQuorumSince checks whether the local member is connected to the // isConnectedToQuorumSince checks whether the local member is connected to the
// quorum of the cluster since the given time. // quorum of the cluster since the given time.
func isConnectedToQuorumSince(transport rafthttp.Transporter, since time.Time, self types.ID, members []*Member) bool { func isConnectedToQuorumSince(transport rafthttp.Transporter, since time.Time, self types.ID, members []*membership.Member) bool {
var connectedNum int var connectedNum int
for _, m := range members { for _, m := range members {
if m.ID == self || isConnectedSince(transport, since, m.ID) { if m.ID == self || isConnectedSince(transport, since, m.ID) {

View File

@ -53,6 +53,14 @@ func NewURLs(strs []string) (URLs, error) {
return us, nil return us, nil
} }
func MustNewURLs(strs []string) URLs {
urls, err := NewURLs(strs)
if err != nil {
panic(err)
}
return urls
}
func (us URLs) String() string { func (us URLs) String() string {
return strings.Join(us.StringSlice(), ",") return strings.Join(us.StringSlice(), ",")
} }