etcdserver: move membership related code to membership pkg

This commit is contained in:
Xiang Li 2016-04-07 12:02:37 -07:00
parent 030865abe3
commit bf2289ae00
21 changed files with 319 additions and 255 deletions

View File

@ -252,7 +252,7 @@ func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster,
// wait for cluster to start
readyC := make(chan error, cfg.clusterSize+cfg.proxySize)
readyStr := "etcdserver: set the initial cluster version to"
readyStr := "membership: set the initial cluster version to"
for i := range etcdCfgs {
go func(etcdp *etcdProcess) {
rs := readyStr

View File

@ -15,7 +15,7 @@
package api
import (
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/go-semver/semver"
@ -29,10 +29,10 @@ type Cluster interface {
// cluster is listening for client requests
ClientURLs() []string
// Members returns a slice of members sorted by their ID
Members() []*etcdserver.Member
Members() []*membership.Member
// Member retrieves a particular member based on ID, or nil if the
// member does not exist in the cluster
Member(id types.ID) *etcdserver.Member
Member(id types.ID) *membership.Member
// IsIDRemoved checks whether the given ID has been removed from this
// cluster at some point in the past
IsIDRemoved(id types.ID) bool

View File

@ -34,6 +34,7 @@ import (
"github.com/coreos/etcd/etcdserver/api/v2http/httptypes"
"github.com/coreos/etcd/etcdserver/auth"
"github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft"
@ -248,10 +249,10 @@ func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
now := h.clock.Now()
m := etcdserver.NewMember("", req.PeerURLs, "", &now)
m := membership.NewMember("", req.PeerURLs, "", &now)
err := h.server.AddMember(ctx, *m)
switch {
case err == etcdserver.ErrIDExists || err == etcdserver.ErrPeerURLexists:
case err == membership.ErrIDExists || err == membership.ErrPeerURLexists:
writeError(w, r, httptypes.NewHTTPError(http.StatusConflict, err.Error()))
return
case err != nil:
@ -272,9 +273,9 @@ func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
err := h.server.RemoveMember(ctx, uint64(id))
switch {
case err == etcdserver.ErrIDRemoved:
case err == membership.ErrIDRemoved:
writeError(w, r, httptypes.NewHTTPError(http.StatusGone, fmt.Sprintf("Member permanently removed: %s", id)))
case err == etcdserver.ErrIDNotFound:
case err == membership.ErrIDNotFound:
writeError(w, r, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", id)))
case err != nil:
plog.Errorf("error removing member %s (%v)", id, err)
@ -291,15 +292,15 @@ func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if ok := unmarshalRequest(r, &req, w); !ok {
return
}
m := etcdserver.Member{
m := membership.Member{
ID: id,
RaftAttributes: etcdserver.RaftAttributes{PeerURLs: req.PeerURLs.StringSlice()},
RaftAttributes: membership.RaftAttributes{PeerURLs: req.PeerURLs.StringSlice()},
}
err := h.server.UpdateMember(ctx, m)
switch {
case err == etcdserver.ErrPeerURLexists:
case err == membership.ErrPeerURLexists:
writeError(w, r, httptypes.NewHTTPError(http.StatusConflict, err.Error()))
case err == etcdserver.ErrIDNotFound:
case err == membership.ErrIDNotFound:
writeError(w, r, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", id)))
case err != nil:
plog.Errorf("error updating member %s (%v)", m.ID, err)
@ -804,7 +805,7 @@ func trimPrefix(p, prefix string) (s string) {
return
}
func newMemberCollection(ms []*etcdserver.Member) *httptypes.MemberCollection {
func newMemberCollection(ms []*membership.Member) *httptypes.MemberCollection {
c := httptypes.MemberCollection(make([]httptypes.Member, len(ms)))
for i, m := range ms {
@ -814,7 +815,7 @@ func newMemberCollection(ms []*etcdserver.Member) *httptypes.MemberCollection {
return &c
}
func newMember(m *etcdserver.Member) httptypes.Member {
func newMember(m *membership.Member) httptypes.Member {
tm := httptypes.Member{
ID: m.ID.String(),
Name: m.Name,

View File

@ -32,6 +32,7 @@ import (
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api/v2http/httptypes"
"github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/pkg/testutil"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb"
@ -103,7 +104,7 @@ func (s *serverRecorder) Process(_ context.Context, m raftpb.Message) error {
s.actions = append(s.actions, action{name: "Process", params: []interface{}{m}})
return nil
}
func (s *serverRecorder) AddMember(_ context.Context, m etcdserver.Member) error {
func (s *serverRecorder) AddMember(_ context.Context, m membership.Member) error {
s.actions = append(s.actions, action{name: "AddMember", params: []interface{}{m}})
return nil
}
@ -112,7 +113,7 @@ func (s *serverRecorder) RemoveMember(_ context.Context, id uint64) error {
return nil
}
func (s *serverRecorder) UpdateMember(_ context.Context, m etcdserver.Member) error {
func (s *serverRecorder) UpdateMember(_ context.Context, m membership.Member) error {
s.actions = append(s.actions, action{name: "UpdateMember", params: []interface{}{m}})
return nil
}
@ -149,9 +150,9 @@ func (rs *resServer) Do(_ context.Context, _ etcdserverpb.Request) (etcdserver.R
return rs.res, nil
}
func (rs *resServer) Process(_ context.Context, _ raftpb.Message) error { return nil }
func (rs *resServer) AddMember(_ context.Context, _ etcdserver.Member) error { return nil }
func (rs *resServer) AddMember(_ context.Context, _ membership.Member) error { return nil }
func (rs *resServer) RemoveMember(_ context.Context, _ uint64) error { return nil }
func (rs *resServer) UpdateMember(_ context.Context, _ etcdserver.Member) error { return nil }
func (rs *resServer) UpdateMember(_ context.Context, _ membership.Member) error { return nil }
func (rs *resServer) ClusterVersion() *semver.Version { return nil }
func boolp(b bool) *bool { return &b }
@ -600,11 +601,11 @@ func TestGoodParseRequest(t *testing.T) {
}
func TestServeMembers(t *testing.T) {
memb1 := etcdserver.Member{ID: 12, Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8080"}}}
memb2 := etcdserver.Member{ID: 13, Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8081"}}}
memb1 := membership.Member{ID: 12, Attributes: membership.Attributes{ClientURLs: []string{"http://localhost:8080"}}}
memb2 := membership.Member{ID: 13, Attributes: membership.Attributes{ClientURLs: []string{"http://localhost:8081"}}}
cluster := &fakeCluster{
id: 1,
members: map[uint64]*etcdserver.Member{1: &memb1, 2: &memb2},
members: map[uint64]*membership.Member{1: &memb1, 2: &memb2},
}
h := &membersHandler{
server: &serverRecorder{},
@ -653,11 +654,11 @@ func TestServeMembers(t *testing.T) {
// TODO: consolidate **ALL** fake server implementations and add no leader test case.
func TestServeLeader(t *testing.T) {
memb1 := etcdserver.Member{ID: 1, Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8080"}}}
memb2 := etcdserver.Member{ID: 2, Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8081"}}}
memb1 := membership.Member{ID: 1, Attributes: membership.Attributes{ClientURLs: []string{"http://localhost:8080"}}}
memb2 := membership.Member{ID: 2, Attributes: membership.Attributes{ClientURLs: []string{"http://localhost:8081"}}}
cluster := &fakeCluster{
id: 1,
members: map[uint64]*etcdserver.Member{1: &memb1, 2: &memb2},
members: map[uint64]*membership.Member{1: &memb1, 2: &memb2},
}
h := &membersHandler{
server: &serverRecorder{},
@ -741,9 +742,9 @@ func TestServeMembersCreate(t *testing.T) {
t.Errorf("got body=%q, want %q", g, wb)
}
wm := etcdserver.Member{
wm := membership.Member{
ID: 3064321551348478165,
RaftAttributes: etcdserver.RaftAttributes{
RaftAttributes: membership.RaftAttributes{
PeerURLs: []string{"http://127.0.0.1:1"},
},
}
@ -816,9 +817,9 @@ func TestServeMembersUpdate(t *testing.T) {
t.Errorf("cid = %s, want %s", gcid, wcid)
}
wm := etcdserver.Member{
wm := membership.Member{
ID: 1,
RaftAttributes: etcdserver.RaftAttributes{
RaftAttributes: membership.RaftAttributes{
PeerURLs: []string{"http://127.0.0.1:1"},
},
}
@ -913,7 +914,7 @@ func TestServeMembersFail(t *testing.T) {
Header: map[string][]string{"Content-Type": {"application/json"}},
},
&errServer{
etcdserver.ErrIDExists,
membership.ErrIDExists,
},
http.StatusConflict,
@ -927,7 +928,7 @@ func TestServeMembersFail(t *testing.T) {
Header: map[string][]string{"Content-Type": {"application/json"}},
},
&errServer{
etcdserver.ErrPeerURLexists,
membership.ErrPeerURLexists,
},
http.StatusConflict,
@ -951,7 +952,7 @@ func TestServeMembersFail(t *testing.T) {
Method: "DELETE",
},
&errServer{
etcdserver.ErrIDRemoved,
membership.ErrIDRemoved,
},
http.StatusGone,
@ -963,7 +964,7 @@ func TestServeMembersFail(t *testing.T) {
Method: "DELETE",
},
&errServer{
etcdserver.ErrIDNotFound,
membership.ErrIDNotFound,
},
http.StatusNotFound,
@ -1047,7 +1048,7 @@ func TestServeMembersFail(t *testing.T) {
Header: map[string][]string{"Content-Type": {"application/json"}},
},
&errServer{
etcdserver.ErrPeerURLexists,
membership.ErrPeerURLexists,
},
http.StatusConflict,
@ -1061,7 +1062,7 @@ func TestServeMembersFail(t *testing.T) {
Header: map[string][]string{"Content-Type": {"application/json"}},
},
&errServer{
etcdserver.ErrIDNotFound,
membership.ErrIDNotFound,
},
http.StatusNotFound,
@ -1963,16 +1964,16 @@ func TestTrimPrefix(t *testing.T) {
}
func TestNewMemberCollection(t *testing.T) {
fixture := []*etcdserver.Member{
fixture := []*membership.Member{
{
ID: 12,
Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8080", "http://localhost:8081"}},
RaftAttributes: etcdserver.RaftAttributes{PeerURLs: []string{"http://localhost:8082", "http://localhost:8083"}},
Attributes: membership.Attributes{ClientURLs: []string{"http://localhost:8080", "http://localhost:8081"}},
RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"http://localhost:8082", "http://localhost:8083"}},
},
{
ID: 13,
Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:9090", "http://localhost:9091"}},
RaftAttributes: etcdserver.RaftAttributes{PeerURLs: []string{"http://localhost:9092", "http://localhost:9093"}},
Attributes: membership.Attributes{ClientURLs: []string{"http://localhost:9090", "http://localhost:9091"}},
RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"http://localhost:9092", "http://localhost:9093"}},
},
}
got := newMemberCollection(fixture)
@ -1996,10 +1997,10 @@ func TestNewMemberCollection(t *testing.T) {
}
func TestNewMember(t *testing.T) {
fixture := &etcdserver.Member{
fixture := &membership.Member{
ID: 12,
Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8080", "http://localhost:8081"}},
RaftAttributes: etcdserver.RaftAttributes{PeerURLs: []string{"http://localhost:8082", "http://localhost:8083"}},
Attributes: membership.Attributes{ClientURLs: []string{"http://localhost:8080", "http://localhost:8081"}},
RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"http://localhost:8082", "http://localhost:8083"}},
}
got := newMember(fixture)

View File

@ -23,6 +23,7 @@ import (
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api/v2http/httptypes"
"github.com/coreos/etcd/etcdserver/auth"
"github.com/coreos/etcd/pkg/logutil"
"github.com/coreos/pkg/capnslog"

View File

@ -24,6 +24,7 @@ import (
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/go-semver/semver"
@ -33,20 +34,20 @@ import (
type fakeCluster struct {
id uint64
clientURLs []string
members map[uint64]*etcdserver.Member
members map[uint64]*membership.Member
}
func (c *fakeCluster) ID() types.ID { return types.ID(c.id) }
func (c *fakeCluster) ClientURLs() []string { return c.clientURLs }
func (c *fakeCluster) Members() []*etcdserver.Member {
var ms etcdserver.MembersByID
func (c *fakeCluster) Members() []*membership.Member {
var ms membership.MembersByID
for _, m := range c.members {
ms = append(ms, m)
}
sort.Sort(ms)
return []*etcdserver.Member(ms)
return []*membership.Member(ms)
}
func (c *fakeCluster) Member(id types.ID) *etcdserver.Member { return c.members[uint64(id)] }
func (c *fakeCluster) Member(id types.ID) *membership.Member { return c.members[uint64(id)] }
func (c *fakeCluster) IsIDRemoved(id types.ID) bool { return false }
func (c *fakeCluster) Version() *semver.Version { return nil }
@ -66,13 +67,13 @@ func (fs *errServer) Do(ctx context.Context, r etcdserverpb.Request) (etcdserver
func (fs *errServer) Process(ctx context.Context, m raftpb.Message) error {
return fs.err
}
func (fs *errServer) AddMember(ctx context.Context, m etcdserver.Member) error {
func (fs *errServer) AddMember(ctx context.Context, m membership.Member) error {
return fs.err
}
func (fs *errServer) RemoveMember(ctx context.Context, id uint64) error {
return fs.err
}
func (fs *errServer) UpdateMember(ctx context.Context, m etcdserver.Member) error {
func (fs *errServer) UpdateMember(ctx context.Context, m membership.Member) error {
return fs.err
}

View File

@ -22,7 +22,7 @@ import (
"path"
"testing"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/pkg/testutil"
"github.com/coreos/etcd/rafthttp"
)
@ -85,14 +85,14 @@ func TestServeMembersFails(t *testing.T) {
}
func TestServeMembersGet(t *testing.T) {
memb1 := etcdserver.Member{ID: 1, Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8080"}}}
memb2 := etcdserver.Member{ID: 2, Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8081"}}}
memb1 := membership.Member{ID: 1, Attributes: membership.Attributes{ClientURLs: []string{"http://localhost:8080"}}}
memb2 := membership.Member{ID: 2, Attributes: membership.Attributes{ClientURLs: []string{"http://localhost:8081"}}}
cluster := &fakeCluster{
id: 1,
members: map[uint64]*etcdserver.Member{1: &memb1, 2: &memb2},
members: map[uint64]*membership.Member{1: &memb1, 2: &memb2},
}
h := &peerMembersHandler{cluster: cluster}
msb, err := json.Marshal([]etcdserver.Member{memb1, memb2})
msb, err := json.Marshal([]membership.Member{memb1, memb2})
if err != nil {
t.Fatal(err)
}

View File

@ -21,6 +21,7 @@ import (
"github.com/coreos/etcd/etcdserver/api"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/pkg/types"
"golang.org/x/net/context"
"google.golang.org/grpc"
@ -48,12 +49,12 @@ func (cs *ClusterServer) MemberAdd(ctx context.Context, r *pb.MemberAddRequest)
}
now := time.Now()
m := etcdserver.NewMember("", urls, "", &now)
m := membership.NewMember("", urls, "", &now)
err = cs.server.AddMember(ctx, *m)
switch {
case err == etcdserver.ErrIDExists:
case err == membership.ErrIDExists:
return nil, rpctypes.ErrMemberExist
case err == etcdserver.ErrPeerURLexists:
case err == membership.ErrPeerURLexists:
return nil, rpctypes.ErrPeerURLExist
case err != nil:
return nil, grpc.Errorf(codes.Internal, err.Error())
@ -68,9 +69,9 @@ func (cs *ClusterServer) MemberAdd(ctx context.Context, r *pb.MemberAddRequest)
func (cs *ClusterServer) MemberRemove(ctx context.Context, r *pb.MemberRemoveRequest) (*pb.MemberRemoveResponse, error) {
err := cs.server.RemoveMember(ctx, r.ID)
switch {
case err == etcdserver.ErrIDRemoved:
case err == membership.ErrIDRemoved:
fallthrough
case err == etcdserver.ErrIDNotFound:
case err == membership.ErrIDNotFound:
return nil, rpctypes.ErrMemberNotFound
case err != nil:
return nil, grpc.Errorf(codes.Internal, err.Error())
@ -80,15 +81,15 @@ func (cs *ClusterServer) MemberRemove(ctx context.Context, r *pb.MemberRemoveReq
}
func (cs *ClusterServer) MemberUpdate(ctx context.Context, r *pb.MemberUpdateRequest) (*pb.MemberUpdateResponse, error) {
m := etcdserver.Member{
m := membership.Member{
ID: types.ID(r.ID),
RaftAttributes: etcdserver.RaftAttributes{PeerURLs: r.PeerURLs},
RaftAttributes: membership.RaftAttributes{PeerURLs: r.PeerURLs},
}
err := cs.server.UpdateMember(ctx, m)
switch {
case err == etcdserver.ErrPeerURLexists:
case err == membership.ErrPeerURLexists:
return nil, rpctypes.ErrPeerURLExist
case err == etcdserver.ErrIDNotFound:
case err == membership.ErrIDNotFound:
return nil, rpctypes.ErrMemberNotFound
case err != nil:
return nil, grpc.Errorf(codes.Internal, err.Error())

View File

@ -22,6 +22,7 @@ import (
"sort"
"time"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/pkg/httputil"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/version"
@ -30,7 +31,7 @@ import (
// isMemberBootstrapped tries to check if the given member has been bootstrapped
// in the given cluster.
func isMemberBootstrapped(cl *cluster, member string, rt http.RoundTripper, timeout time.Duration) bool {
func isMemberBootstrapped(cl *membership.RaftCluster, member string, rt http.RoundTripper, timeout time.Duration) bool {
rcl, err := getClusterFromRemotePeers(getRemotePeerURLs(cl, member), timeout, false, rt)
if err != nil {
return false
@ -53,12 +54,12 @@ func isMemberBootstrapped(cl *cluster, member string, rt http.RoundTripper, time
// response, an error is returned.
// Each request has a 10-second timeout. Because the upper limit of TTL is 5s,
// 10 second is enough for building connection and finishing request.
func GetClusterFromRemotePeers(urls []string, rt http.RoundTripper) (*cluster, error) {
func GetClusterFromRemotePeers(urls []string, rt http.RoundTripper) (*membership.RaftCluster, error) {
return getClusterFromRemotePeers(urls, 10*time.Second, true, rt)
}
// If logerr is true, it prints out more error messages.
func getClusterFromRemotePeers(urls []string, timeout time.Duration, logerr bool, rt http.RoundTripper) (*cluster, error) {
func getClusterFromRemotePeers(urls []string, timeout time.Duration, logerr bool, rt http.RoundTripper) (*membership.RaftCluster, error) {
cc := &http.Client{
Transport: rt,
Timeout: timeout,
@ -78,7 +79,7 @@ func getClusterFromRemotePeers(urls []string, timeout time.Duration, logerr bool
}
continue
}
var membs []*Member
var membs []*membership.Member
if err = json.Unmarshal(b, &membs); err != nil {
if logerr {
plog.Warningf("could not unmarshal cluster response: %v", err)
@ -92,14 +93,14 @@ func getClusterFromRemotePeers(urls []string, timeout time.Duration, logerr bool
}
continue
}
return newClusterFromMembers("", id, membs), nil
return membership.NewClusterFromMembers("", id, membs), nil
}
return nil, fmt.Errorf("could not retrieve cluster information from the given urls")
}
// getRemotePeerURLs returns peer urls of remote members in the cluster. The
// returned list is sorted in ascending lexicographical order.
func getRemotePeerURLs(cl *cluster, local string) []string {
func getRemotePeerURLs(cl *membership.RaftCluster, local string) []string {
us := make([]string, 0)
for _, m := range cl.Members() {
if m.Name == local {
@ -115,7 +116,7 @@ func getRemotePeerURLs(cl *cluster, local string) []string {
// The key of the returned map is the member's ID. The value of the returned map
// is the semver versions string, including server and cluster.
// If it fails to get the version of a member, the key will be nil.
func getVersions(cl *cluster, local types.ID, rt http.RoundTripper) map[string]*version.Versions {
func getVersions(cl *membership.RaftCluster, local types.ID, rt http.RoundTripper) map[string]*version.Versions {
members := cl.Members()
vers := make(map[string]*version.Versions)
for _, m := range members {
@ -173,7 +174,7 @@ func decideClusterVersion(vers map[string]*version.Versions) *semver.Version {
// cluster version in the range of [MinClusterVersion, Version] and no known members has a cluster version
// out of the range.
// We set this rule since when the local member joins, another member might be offline.
func isCompatibleWithCluster(cl *cluster, local types.ID, rt http.RoundTripper) bool {
func isCompatibleWithCluster(cl *membership.RaftCluster, local types.ID, rt http.RoundTripper) bool {
vers := getVersions(cl, local, rt)
minV := semver.Must(semver.NewVersion(version.MinClusterVersion))
maxV := semver.Must(semver.NewVersion(version.Version))
@ -215,7 +216,7 @@ func isCompatibleWithVers(vers map[string]*version.Versions, local types.ID, min
// getVersion returns the Versions of the given member via its
// peerURLs. Returns the last error if it fails to get the version.
func getVersion(m *Member, rt http.RoundTripper) (*version.Versions, error) {
func getVersion(m *membership.Member, rt http.RoundTripper) (*version.Versions, error) {
cc := &http.Client{
Transport: rt,
}
@ -255,12 +256,3 @@ func getVersion(m *Member, rt http.RoundTripper) (*version.Versions, error) {
}
return nil, err
}
func MustDetectDowngrade(cv *semver.Version) {
lv := semver.Must(semver.NewVersion(version.Version))
// only keep major.minor version for comparison against cluster version
lv = &semver.Version{Major: lv.Major, Minor: lv.Minor}
if cv != nil && lv.LessThan(*cv) {
plog.Fatalf("cluster cannot be downgraded (current version: %s is lower than determined cluster version: %s).", version.Version, version.Cluster(cv.String()))
}
}

View File

@ -17,17 +17,11 @@ package etcdserver
import (
"errors"
"fmt"
etcdErr "github.com/coreos/etcd/error"
)
var (
ErrUnknownMethod = errors.New("etcdserver: unknown method")
ErrStopped = errors.New("etcdserver: server stopped")
ErrIDRemoved = errors.New("etcdserver: ID removed")
ErrIDExists = errors.New("etcdserver: ID exists")
ErrIDNotFound = errors.New("etcdserver: ID not found")
ErrPeerURLexists = errors.New("etcdserver: peerURL exists")
ErrCanceled = errors.New("etcdserver: request cancelled")
ErrTimeout = errors.New("etcdserver: request timed out")
ErrTimeoutDueToLeaderFail = errors.New("etcdserver: request timed out, possibly due to previous leader failure")
@ -38,11 +32,6 @@ var (
ErrNoSpace = errors.New("etcdserver: no space")
)
func isKeyNotFound(err error) bool {
e, ok := err.(*etcdErr.Error)
return ok && e.ErrorCode == etcdErr.EcodeKeyNotFound
}
type DiscoveryError struct {
Op string
Err error

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package etcdserver
package membership
import (
"bytes"
@ -34,13 +34,8 @@ import (
"github.com/coreos/go-semver/semver"
)
const (
raftAttributesSuffix = "raftAttributes"
attributesSuffix = "attributes"
)
// Cluster is a list of Members that belong to the same raft cluster
type cluster struct {
// RaftCluster is a list of Members that belong to the same raft cluster
type RaftCluster struct {
id types.ID
token string
store store.Store
@ -53,8 +48,8 @@ type cluster struct {
removed map[types.ID]bool
}
func newClusterFromURLsMap(token string, urlsmap types.URLsMap) (*cluster, error) {
c := newCluster(token)
func NewClusterFromURLsMap(token string, urlsmap types.URLsMap) (*RaftCluster, error) {
c := NewCluster(token)
for name, urls := range urlsmap {
m := NewMember(name, urls, token, nil)
if _, ok := c.members[m.ID]; ok {
@ -69,8 +64,8 @@ func newClusterFromURLsMap(token string, urlsmap types.URLsMap) (*cluster, error
return c, nil
}
func newClusterFromMembers(token string, id types.ID, membs []*Member) *cluster {
c := newCluster(token)
func NewClusterFromMembers(token string, id types.ID, membs []*Member) *RaftCluster {
c := NewCluster(token)
c.id = id
for _, m := range membs {
c.members[m.ID] = m
@ -78,17 +73,17 @@ func newClusterFromMembers(token string, id types.ID, membs []*Member) *cluster
return c
}
func newCluster(token string) *cluster {
return &cluster{
func NewCluster(token string) *RaftCluster {
return &RaftCluster{
token: token,
members: make(map[types.ID]*Member),
removed: make(map[types.ID]bool),
}
}
func (c *cluster) ID() types.ID { return c.id }
func (c *RaftCluster) ID() types.ID { return c.id }
func (c *cluster) Members() []*Member {
func (c *RaftCluster) Members() []*Member {
c.Lock()
defer c.Unlock()
var ms MembersByID
@ -99,7 +94,7 @@ func (c *cluster) Members() []*Member {
return []*Member(ms)
}
func (c *cluster) Member(id types.ID) *Member {
func (c *RaftCluster) Member(id types.ID) *Member {
c.Lock()
defer c.Unlock()
return c.members[id].Clone()
@ -107,7 +102,7 @@ func (c *cluster) Member(id types.ID) *Member {
// MemberByName returns a Member with the given name if exists.
// If more than one member has the given name, it will panic.
func (c *cluster) MemberByName(name string) *Member {
func (c *RaftCluster) MemberByName(name string) *Member {
c.Lock()
defer c.Unlock()
var memb *Member
@ -122,7 +117,7 @@ func (c *cluster) MemberByName(name string) *Member {
return memb.Clone()
}
func (c *cluster) MemberIDs() []types.ID {
func (c *RaftCluster) MemberIDs() []types.ID {
c.Lock()
defer c.Unlock()
var ids []types.ID
@ -133,7 +128,7 @@ func (c *cluster) MemberIDs() []types.ID {
return ids
}
func (c *cluster) IsIDRemoved(id types.ID) bool {
func (c *RaftCluster) IsIDRemoved(id types.ID) bool {
c.Lock()
defer c.Unlock()
return c.removed[id]
@ -141,7 +136,7 @@ func (c *cluster) IsIDRemoved(id types.ID) bool {
// PeerURLs returns a list of all peer addresses.
// The returned list is sorted in ascending lexicographical order.
func (c *cluster) PeerURLs() []string {
func (c *RaftCluster) PeerURLs() []string {
c.Lock()
defer c.Unlock()
urls := make([]string, 0)
@ -156,7 +151,7 @@ func (c *cluster) PeerURLs() []string {
// ClientURLs returns a list of all client addresses.
// The returned list is sorted in ascending lexicographical order.
func (c *cluster) ClientURLs() []string {
func (c *RaftCluster) ClientURLs() []string {
c.Lock()
defer c.Unlock()
urls := make([]string, 0)
@ -169,7 +164,7 @@ func (c *cluster) ClientURLs() []string {
return urls
}
func (c *cluster) String() string {
func (c *RaftCluster) String() string {
c.Lock()
defer c.Unlock()
b := &bytes.Buffer{}
@ -187,7 +182,7 @@ func (c *cluster) String() string {
return b.String()
}
func (c *cluster) genID() {
func (c *RaftCluster) genID() {
mIDs := c.MemberIDs()
b := make([]byte, 8*len(mIDs))
for i, id := range mIDs {
@ -197,17 +192,17 @@ func (c *cluster) genID() {
c.id = types.ID(binary.BigEndian.Uint64(hash[:8]))
}
func (c *cluster) SetID(id types.ID) { c.id = id }
func (c *RaftCluster) SetID(id types.ID) { c.id = id }
func (c *cluster) SetStore(st store.Store) { c.store = st }
func (c *RaftCluster) SetStore(st store.Store) { c.store = st }
func (c *cluster) Recover() {
func (c *RaftCluster) Recover() {
c.Lock()
defer c.Unlock()
c.members, c.removed = membersFromStore(c.store)
c.version = clusterVersionFromStore(c.store)
MustDetectDowngrade(c.version)
mustDetectDowngrade(c.version)
for _, m := range c.members {
plog.Infof("added member %s %v to cluster %s from store", m.ID, m.PeerURLs, c.id)
@ -219,7 +214,7 @@ func (c *cluster) Recover() {
// ValidateConfigurationChange takes a proposed ConfChange and
// ensures that it is still valid.
func (c *cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
members, removed := membersFromStore(c.store)
id := types.ID(cc.NodeID)
if removed[id] {
@ -280,36 +275,42 @@ func (c *cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
// AddMember adds a new Member into the cluster, and saves the given member's
// raftAttributes into the store. The given member should have empty attributes.
// A Member with a matching id must not exist.
func (c *cluster) AddMember(m *Member) {
func (c *RaftCluster) AddMember(m *Member) {
c.Lock()
defer c.Unlock()
b, err := json.Marshal(m.RaftAttributes)
if err != nil {
plog.Panicf("marshal raftAttributes should never fail: %v", err)
}
p := path.Join(memberStoreKey(m.ID), raftAttributesSuffix)
if _, err := c.store.Create(p, false, string(b), false, store.TTLOptionSet{ExpireTime: store.Permanent}); err != nil {
plog.Panicf("create raftAttributes should never fail: %v", err)
if c.store != nil {
b, err := json.Marshal(m.RaftAttributes)
if err != nil {
plog.Panicf("marshal raftAttributes should never fail: %v", err)
}
p := path.Join(MemberStoreKey(m.ID), raftAttributesSuffix)
if _, err := c.store.Create(p, false, string(b), false, store.TTLOptionSet{ExpireTime: store.Permanent}); err != nil {
plog.Panicf("create raftAttributes should never fail: %v", err)
}
}
c.members[m.ID] = m
}
// RemoveMember removes a member from the store.
// The given id MUST exist, or the function panics.
func (c *cluster) RemoveMember(id types.ID) {
func (c *RaftCluster) RemoveMember(id types.ID) {
c.Lock()
defer c.Unlock()
if _, err := c.store.Delete(memberStoreKey(id), true, true); err != nil {
plog.Panicf("delete member should never fail: %v", err)
if c.store != nil {
if _, err := c.store.Delete(MemberStoreKey(id), true, true); err != nil {
plog.Panicf("delete member should never fail: %v", err)
}
}
delete(c.members, id)
if _, err := c.store.Create(removedMemberStoreKey(id), false, "", false, store.TTLOptionSet{ExpireTime: store.Permanent}); err != nil {
plog.Panicf("create removedMember should never fail: %v", err)
if c.store != nil {
if _, err := c.store.Create(RemovedMemberStoreKey(id), false, "", false, store.TTLOptionSet{ExpireTime: store.Permanent}); err != nil {
plog.Panicf("create removedMember should never fail: %v", err)
}
}
c.removed[id] = true
}
func (c *cluster) UpdateAttributes(id types.ID, attr Attributes) bool {
func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes) bool {
c.Lock()
defer c.Unlock()
if m, ok := c.members[id]; ok {
@ -326,21 +327,24 @@ func (c *cluster) UpdateAttributes(id types.ID, attr Attributes) bool {
return false
}
func (c *cluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) {
func (c *RaftCluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) {
c.Lock()
defer c.Unlock()
b, err := json.Marshal(raftAttr)
if err != nil {
plog.Panicf("marshal raftAttributes should never fail: %v", err)
}
p := path.Join(memberStoreKey(id), raftAttributesSuffix)
if _, err := c.store.Update(p, string(b), store.TTLOptionSet{ExpireTime: store.Permanent}); err != nil {
plog.Panicf("update raftAttributes should never fail: %v", err)
if c.store != nil {
b, err := json.Marshal(raftAttr)
if err != nil {
plog.Panicf("marshal raftAttributes should never fail: %v", err)
}
p := path.Join(MemberStoreKey(id), raftAttributesSuffix)
if _, err := c.store.Update(p, string(b), store.TTLOptionSet{ExpireTime: store.Permanent}); err != nil {
plog.Panicf("update raftAttributes should never fail: %v", err)
}
}
c.members[id].RaftAttributes = raftAttr
}
func (c *cluster) Version() *semver.Version {
func (c *RaftCluster) Version() *semver.Version {
c.Lock()
defer c.Unlock()
if c.version == nil {
@ -349,7 +353,7 @@ func (c *cluster) Version() *semver.Version {
return semver.Must(semver.NewVersion(c.version.String()))
}
func (c *cluster) SetVersion(ver *semver.Version) {
func (c *RaftCluster) SetVersion(ver *semver.Version) {
c.Lock()
defer c.Unlock()
if c.version != nil {
@ -358,10 +362,10 @@ func (c *cluster) SetVersion(ver *semver.Version) {
plog.Noticef("set the initial cluster version to %v", version.Cluster(ver.String()))
}
c.version = ver
MustDetectDowngrade(c.version)
mustDetectDowngrade(c.version)
}
func (c *cluster) isReadyToAddNewMember() bool {
func (c *RaftCluster) IsReadyToAddNewMember() bool {
nmembers := 1
nstarted := 0
@ -389,7 +393,7 @@ func (c *cluster) isReadyToAddNewMember() bool {
return true
}
func (c *cluster) isReadyToRemoveMember(id uint64) bool {
func (c *RaftCluster) IsReadyToRemoveMember(id uint64) bool {
nmembers := 0
nstarted := 0
@ -416,7 +420,7 @@ func (c *cluster) isReadyToRemoveMember(id uint64) bool {
func membersFromStore(st store.Store) (map[types.ID]*Member, map[types.ID]bool) {
members := make(map[types.ID]*Member)
removed := make(map[types.ID]bool)
e, err := st.Get(storeMembersPrefix, true, true)
e, err := st.Get(StoreMembersPrefix, true, true)
if err != nil {
if isKeyNotFound(err) {
return members, removed
@ -440,13 +444,13 @@ func membersFromStore(st store.Store) (map[types.ID]*Member, map[types.ID]bool)
plog.Panicf("get storeRemovedMembers should never fail: %v", err)
}
for _, n := range e.Node.Nodes {
removed[mustParseMemberIDFromKey(n.Key)] = true
removed[MustParseMemberIDFromKey(n.Key)] = true
}
return members, removed
}
func clusterVersionFromStore(st store.Store) *semver.Version {
e, err := st.Get(path.Join(StoreClusterPrefix, "version"), false, false)
e, err := st.Get(path.Join(storePrefix, "version"), false, false)
if err != nil {
if isKeyNotFound(err) {
return nil
@ -460,7 +464,7 @@ func clusterVersionFromStore(st store.Store) *semver.Version {
// with the existing cluster. If the validation succeeds, it assigns the IDs
// from the existing cluster to the local cluster.
// If the validation fails, an error will be returned.
func ValidateClusterAndAssignIDs(local *cluster, existing *cluster) error {
func ValidateClusterAndAssignIDs(local *RaftCluster, existing *RaftCluster) error {
ems := existing.Members()
lms := local.Members()
if len(ems) != len(lms) {
@ -481,3 +485,12 @@ func ValidateClusterAndAssignIDs(local *cluster, existing *cluster) error {
}
return nil
}
func mustDetectDowngrade(cv *semver.Version) {
lv := semver.Must(semver.NewVersion(version.Version))
// only keep major.minor version for comparison against cluster version
lv = &semver.Version{Major: lv.Major, Minor: lv.Minor}
if cv != nil && lv.LessThan(*cv) {
plog.Fatalf("cluster cannot be downgraded (current version: %s is lower than determined cluster version: %s).", version.Version, version.Cluster(cv.String()))
}
}

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package etcdserver
package membership
import (
"encoding/json"
@ -274,7 +274,7 @@ func TestClusterValidateAndAssignIDs(t *testing.T) {
}
func TestClusterValidateConfigurationChange(t *testing.T) {
cl := newCluster("")
cl := NewCluster("")
cl.SetStore(store.New())
for i := 1; i <= 4; i++ {
attr := RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", i)}}
@ -457,7 +457,7 @@ func TestClusterAddMember(t *testing.T) {
{
Name: "Create",
Params: []interface{}{
path.Join(storeMembersPrefix, "1", "raftAttributes"),
path.Join(StoreMembersPrefix, "1", "raftAttributes"),
false,
`{"peerURLs":null}`,
false,
@ -471,7 +471,7 @@ func TestClusterAddMember(t *testing.T) {
}
func TestClusterMembers(t *testing.T) {
cls := &cluster{
cls := &RaftCluster{
members: map[types.ID]*Member{
1: {ID: 1},
20: {ID: 20},
@ -499,8 +499,8 @@ func TestClusterRemoveMember(t *testing.T) {
c.RemoveMember(1)
wactions := []testutil.Action{
{Name: "Delete", Params: []interface{}{memberStoreKey(1), true, true}},
{Name: "Create", Params: []interface{}{removedMemberStoreKey(1), false, "", false, store.TTLOptionSet{ExpireTime: store.Permanent}}},
{Name: "Delete", Params: []interface{}{MemberStoreKey(1), true, true}},
{Name: "Create", Params: []interface{}{RemovedMemberStoreKey(1), false, "", false, store.TTLOptionSet{ExpireTime: store.Permanent}}},
}
if !reflect.DeepEqual(st.Action(), wactions) {
t.Errorf("actions = %v, want %v", st.Action(), wactions)
@ -558,8 +558,8 @@ func TestNodeToMember(t *testing.T) {
}
}
func newTestCluster(membs []*Member) *cluster {
c := &cluster{members: make(map[types.ID]*Member), removed: make(map[types.ID]bool)}
func newTestCluster(membs []*Member) *RaftCluster {
c := &RaftCluster{members: make(map[types.ID]*Member), removed: make(map[types.ID]bool)}
for _, m := range membs {
c.members[m.ID] = m
}
@ -642,7 +642,7 @@ func TestIsReadyToAddNewMember(t *testing.T) {
}
for i, tt := range tests {
c := newTestCluster(tt.members)
if got := c.isReadyToAddNewMember(); got != tt.want {
if got := c.IsReadyToAddNewMember(); got != tt.want {
t.Errorf("%d: isReadyToAddNewMember returned %t, want %t", i, got, tt.want)
}
}
@ -727,7 +727,7 @@ func TestIsReadyToRemoveMember(t *testing.T) {
}
for i, tt := range tests {
c := newTestCluster(tt.members)
if got := c.isReadyToRemoveMember(tt.removeID); got != tt.want {
if got := c.IsReadyToRemoveMember(tt.removeID); got != tt.want {
t.Errorf("%d: isReadyToAddNewMember returned %t, want %t", i, got, tt.want)
}
}

View File

@ -0,0 +1,33 @@
// Copyright 2016 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package membership
import (
"errors"
etcdErr "github.com/coreos/etcd/error"
)
var (
ErrIDRemoved = errors.New("membership: ID removed")
ErrIDExists = errors.New("membership: ID exists")
ErrIDNotFound = errors.New("membership: ID not found")
ErrPeerURLexists = errors.New("membership: peerURL exists")
)
func isKeyNotFound(err error) bool {
e, ok := err.(*etcdErr.Error)
return ok && e.ErrorCode == etcdErr.EcodeKeyNotFound
}

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package etcdserver
package membership
import (
"crypto/sha1"
@ -26,11 +26,24 @@ import (
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/store"
"github.com/coreos/pkg/capnslog"
)
var (
storeMembersPrefix = path.Join(StoreClusterPrefix, "members")
storeRemovedMembersPrefix = path.Join(StoreClusterPrefix, "removed_members")
plog = capnslog.NewPackageLogger("github.com/coreos/etcd/etcdserver", "membership")
StoreMembersPrefix = path.Join(storePrefix, "members")
storeRemovedMembersPrefix = path.Join(storePrefix, "removed_members")
)
const (
// TODO: make this private after moving all membership storage logic
// from etcdserver pkg
AttributesSuffix = "attributes"
raftAttributesSuffix = "raftAttributes"
// the prefix for stroing membership related information in store provided by store pkg.
storePrefix = "/0"
)
// RaftAttributes represents the raft related attributes of an etcd member.
@ -110,15 +123,15 @@ func (m *Member) IsStarted() bool {
return len(m.Name) != 0
}
func memberStoreKey(id types.ID) string {
return path.Join(storeMembersPrefix, id.String())
func MemberStoreKey(id types.ID) string {
return path.Join(StoreMembersPrefix, id.String())
}
func MemberAttributesStorePath(id types.ID) string {
return path.Join(memberStoreKey(id), attributesSuffix)
return path.Join(MemberStoreKey(id), AttributesSuffix)
}
func mustParseMemberIDFromKey(key string) types.ID {
func MustParseMemberIDFromKey(key string) types.ID {
id, err := types.IDFromString(path.Base(key))
if err != nil {
plog.Panicf("unexpected parse member id error: %v", err)
@ -126,17 +139,17 @@ func mustParseMemberIDFromKey(key string) types.ID {
return id
}
func removedMemberStoreKey(id types.ID) string {
func RemovedMemberStoreKey(id types.ID) string {
return path.Join(storeRemovedMembersPrefix, id.String())
}
// nodeToMember builds member from a key value node.
// NodeToMember builds member from a key value node.
// the child nodes of the given node MUST be sorted by key.
func nodeToMember(n *store.NodeExtern) (*Member, error) {
m := &Member{ID: mustParseMemberIDFromKey(n.Key)}
m := &Member{ID: MustParseMemberIDFromKey(n.Key)}
attrs := make(map[string][]byte)
raftAttrKey := path.Join(n.Key, raftAttributesSuffix)
attrKey := path.Join(n.Key, attributesSuffix)
attrKey := path.Join(n.Key, AttributesSuffix)
for _, nn := range n.Nodes {
if nn.Key != raftAttrKey && nn.Key != attrKey {
return nil, fmt.Errorf("unknown key %q", nn.Key)

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package etcdserver
package membership
import (
"net/url"

View File

@ -24,6 +24,7 @@ import (
"time"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/pkg/contention"
"github.com/coreos/etcd/pkg/pbutil"
"github.com/coreos/etcd/pkg/types"
@ -279,7 +280,7 @@ func advanceTicksForElection(n raft.Node, electionTicks int) {
}
}
func startNode(cfg *ServerConfig, cl *cluster, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) {
func startNode(cfg *ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) {
var err error
member := cl.MemberByName(cfg.Name)
metadata := pbutil.MustMarshal(
@ -323,7 +324,7 @@ func startNode(cfg *ServerConfig, cl *cluster, ids []types.ID) (id types.ID, n r
return
}
func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *cluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
var walsnap walpb.Snapshot
if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
@ -331,7 +332,7 @@ func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *clust
w, id, cid, st, ents := readWAL(cfg.WALDir(), walsnap)
plog.Infof("restarting member %s in cluster %s at commit index %d", id, cid, st.Commit)
cl := newCluster("")
cl := membership.NewCluster("")
cl.SetID(cid)
s := raft.NewMemoryStorage()
if snapshot != nil {
@ -357,7 +358,7 @@ func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *clust
return id, cl, n, s, w
}
func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *cluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
var walsnap walpb.Snapshot
if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
@ -387,7 +388,7 @@ func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (type
}
plog.Printf("forcing restart of member %s in cluster %s at commit index %d", id, cid, st.Commit)
cl := newCluster("")
cl := membership.NewCluster("")
cl.SetID(cid)
s := raft.NewMemoryStorage()
if snapshot != nil {
@ -473,9 +474,9 @@ func createConfigChangeEnts(ids []uint64, self uint64, term, index uint64) []raf
next++
}
if !found {
m := Member{
m := membership.Member{
ID: types.ID(self),
RaftAttributes: RaftAttributes{PeerURLs: []string{"http://localhost:7001", "http://localhost:2380"}},
RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"http://localhost:7001", "http://localhost:2380"}},
}
ctx, err := json.Marshal(m)
if err != nil {

View File

@ -20,6 +20,7 @@ import (
"testing"
"time"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/pkg/mock/mockstorage"
"github.com/coreos/etcd/pkg/pbutil"
"github.com/coreos/etcd/pkg/types"
@ -71,9 +72,9 @@ func TestGetIDs(t *testing.T) {
}
func TestCreateConfigChangeEnts(t *testing.T) {
m := Member{
m := membership.Member{
ID: types.ID(1),
RaftAttributes: RaftAttributes{PeerURLs: []string{"http://localhost:7001", "http://localhost:2380"}},
RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"http://localhost:7001", "http://localhost:2380"}},
}
ctx, err := json.Marshal(m)
if err != nil {

View File

@ -33,6 +33,7 @@ import (
"github.com/coreos/etcd/discovery"
"github.com/coreos/etcd/etcdserver/api/v2http/httptypes"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/lease"
"github.com/coreos/etcd/pkg/fileutil"
@ -82,7 +83,7 @@ const (
var (
plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "etcdserver")
storeMemberAttributeRegexp = regexp.MustCompile(path.Join(storeMembersPrefix, "[[:xdigit:]]{1,16}", attributesSuffix))
storeMemberAttributeRegexp = regexp.MustCompile(path.Join(membership.StoreMembersPrefix, "[[:xdigit:]]{1,16}", "attributes"))
)
func init() {
@ -126,7 +127,7 @@ type Server interface {
// AddMember attempts to add a member into the cluster. It will return
// ErrIDRemoved if member ID is removed from the cluster, or return
// ErrIDExists if member ID exists in the cluster.
AddMember(ctx context.Context, memb Member) error
AddMember(ctx context.Context, memb membership.Member) error
// RemoveMember attempts to remove a member from the cluster. It will
// return ErrIDRemoved if member ID is removed from the cluster, or return
// ErrIDNotFound if member ID is not in the cluster.
@ -134,7 +135,7 @@ type Server interface {
// UpdateMember attempts to update an existing member in the cluster. It will
// return ErrIDNotFound if the member ID does not exist.
UpdateMember(ctx context.Context, updateMemb Member) error
UpdateMember(ctx context.Context, updateMemb membership.Member) error
// ClusterVersion is the cluster-wide minimum major.minor version.
// Cluster version is set to the min version that an etcd member is
@ -167,9 +168,9 @@ type EtcdServer struct {
done chan struct{}
errorc chan error
id types.ID
attributes Attributes
attributes membership.Attributes
cluster *cluster
cluster *membership.RaftCluster
store store.Store
@ -216,7 +217,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
n raft.Node
s *raft.MemoryStorage
id types.ID
cl *cluster
cl *membership.RaftCluster
)
if terr := fileutil.TouchDirAll(cfg.DataDir); terr != nil {
@ -239,13 +240,13 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
if err != nil {
return nil, err
}
var remotes []*Member
var remotes []*membership.Member
switch {
case !haveWAL && !cfg.NewCluster:
if err := cfg.VerifyJoinExisting(); err != nil {
return nil, err
}
cl, err = newClusterFromURLsMap(cfg.InitialClusterToken, cfg.InitialPeerURLsMap)
cl, err = membership.NewClusterFromURLsMap(cfg.InitialClusterToken, cfg.InitialPeerURLsMap)
if err != nil {
return nil, err
}
@ -253,7 +254,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
if err != nil {
return nil, fmt.Errorf("cannot fetch cluster info from peer urls: %v", err)
}
if err := ValidateClusterAndAssignIDs(cl, existingCluster); err != nil {
if err := membership.ValidateClusterAndAssignIDs(cl, existingCluster); err != nil {
return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err)
}
if !isCompatibleWithCluster(cl, cl.MemberByName(cfg.Name).ID, prt) {
@ -261,7 +262,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
}
remotes = existingCluster.Members()
cl.SetID(existingCluster.id)
cl.SetID(existingCluster.ID())
cl.SetStore(st)
cfg.Print()
id, n, s, w = startNode(cfg, cl, nil)
@ -269,7 +270,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
if err := cfg.VerifyBootstrap(); err != nil {
return nil, err
}
cl, err = newClusterFromURLsMap(cfg.InitialClusterToken, cfg.InitialPeerURLsMap)
cl, err = membership.NewClusterFromURLsMap(cfg.InitialClusterToken, cfg.InitialPeerURLsMap)
if err != nil {
return nil, err
}
@ -291,7 +292,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
if checkDuplicateURL(urlsmap) {
return nil, fmt.Errorf("discovery cluster %s has duplicate url", urlsmap)
}
if cl, err = newClusterFromURLsMap(cfg.InitialClusterToken, urlsmap); err != nil {
if cl, err = membership.NewClusterFromURLsMap(cfg.InitialClusterToken, urlsmap); err != nil {
return nil, err
}
}
@ -357,7 +358,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
storage: NewStorage(w, ss),
},
id: id,
attributes: Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
cluster: cl,
stats: sstats,
lstats: lstats,
@ -466,7 +467,7 @@ func (s *EtcdServer) purgeFile() {
func (s *EtcdServer) ID() types.ID { return s.id }
func (s *EtcdServer) Cluster() *cluster { return s.cluster }
func (s *EtcdServer) Cluster() *membership.RaftCluster { return s.cluster }
func (s *EtcdServer) RaftHandler() http.Handler { return s.r.transport.Handler() }
@ -792,8 +793,8 @@ func (s *EtcdServer) LeaderStats() []byte {
func (s *EtcdServer) StoreStats() []byte { return s.store.JsonStats() }
func (s *EtcdServer) AddMember(ctx context.Context, memb Member) error {
if s.cfg.StrictReconfigCheck && !s.cluster.isReadyToAddNewMember() {
func (s *EtcdServer) AddMember(ctx context.Context, memb membership.Member) error {
if s.cfg.StrictReconfigCheck && !s.cluster.IsReadyToAddNewMember() {
// If s.cfg.StrictReconfigCheck is false, it means the option --strict-reconfig-check isn't passed to etcd.
// In such a case adding a new member is allowed unconditionally
return ErrNotEnoughStartedMembers
@ -813,7 +814,7 @@ func (s *EtcdServer) AddMember(ctx context.Context, memb Member) error {
}
func (s *EtcdServer) RemoveMember(ctx context.Context, id uint64) error {
if s.cfg.StrictReconfigCheck && !s.cluster.isReadyToRemoveMember(id) {
if s.cfg.StrictReconfigCheck && !s.cluster.IsReadyToRemoveMember(id) {
// If s.cfg.StrictReconfigCheck is false, it means the option --strict-reconfig-check isn't passed to etcd.
// In such a case removing a member is allowed unconditionally
return ErrNotEnoughStartedMembers
@ -826,7 +827,7 @@ func (s *EtcdServer) RemoveMember(ctx context.Context, id uint64) error {
return s.configure(ctx, cc)
}
func (s *EtcdServer) UpdateMember(ctx context.Context, memb Member) error {
func (s *EtcdServer) UpdateMember(ctx context.Context, memb membership.Member) error {
b, err := json.Marshal(memb)
if err != nil {
return err
@ -914,7 +915,7 @@ func (s *EtcdServer) publish(timeout time.Duration) {
}
req := pb.Request{
Method: "PUT",
Path: MemberAttributesStorePath(s.id),
Path: membership.MemberAttributesStorePath(s.id),
Val: string(b),
}
@ -1093,8 +1094,8 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response {
// TODO (yicheng): cluster should be the owner of cluster prefix store
// we should not modify cluster store here.
if storeMemberAttributeRegexp.MatchString(r.Path) {
id := mustParseMemberIDFromKey(path.Dir(r.Path))
var attr Attributes
id := membership.MustParseMemberIDFromKey(path.Dir(r.Path))
var attr membership.Attributes
if err := json.Unmarshal([]byte(r.Val), &attr); err != nil {
plog.Panicf("unmarshal %s should never fail: %v", r.Val, err)
}
@ -1137,7 +1138,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
*confState = *s.r.ApplyConfChange(cc)
switch cc.Type {
case raftpb.ConfChangeAddNode:
m := new(Member)
m := new(membership.Member)
if err := json.Unmarshal(cc.Context, m); err != nil {
plog.Panicf("unmarshal member should never fail: %v", err)
}
@ -1161,7 +1162,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
plog.Noticef("removed member %s from cluster %s", id, s.cluster.ID())
}
case raftpb.ConfChangeUpdateNode:
m := new(Member)
m := new(membership.Member)
if err := json.Unmarshal(cc.Context, m); err != nil {
plog.Panicf("unmarshal member should never fail: %v", err)
}

View File

@ -26,6 +26,7 @@ import (
"time"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/lease"
"github.com/coreos/etcd/pkg/idutil"
"github.com/coreos/etcd/pkg/mock/mockstorage"
@ -166,7 +167,7 @@ func TestApplyRepeat(t *testing.T) {
cl := newTestCluster(nil)
st := store.New()
cl.SetStore(store.New())
cl.AddMember(&Member{ID: 1234})
cl.AddMember(&membership.Member{ID: 1234})
s := &EtcdServer{
r: raftNode{
Node: n,
@ -457,7 +458,7 @@ func TestApplyRequest(t *testing.T) {
}
func TestApplyRequestOnAdminMemberAttributes(t *testing.T) {
cl := newTestCluster([]*Member{{ID: 1}})
cl := newTestCluster([]*membership.Member{{ID: 1}})
srv := &EtcdServer{
store: mockstore.NewRecorder(),
cluster: cl,
@ -465,21 +466,21 @@ func TestApplyRequestOnAdminMemberAttributes(t *testing.T) {
req := pb.Request{
Method: "PUT",
ID: 1,
Path: path.Join(storeMembersPrefix, strconv.FormatUint(1, 16), attributesSuffix),
Path: path.Join(membership.StoreMembersPrefix, strconv.FormatUint(1, 16), membership.AttributesSuffix),
Val: `{"Name":"abc","ClientURLs":["http://127.0.0.1:2379"]}`,
}
srv.applyRequest(req)
w := Attributes{Name: "abc", ClientURLs: []string{"http://127.0.0.1:2379"}}
w := membership.Attributes{Name: "abc", ClientURLs: []string{"http://127.0.0.1:2379"}}
if g := cl.Member(1).Attributes; !reflect.DeepEqual(g, w) {
t.Errorf("attributes = %v, want %v", g, w)
}
}
func TestApplyConfChangeError(t *testing.T) {
cl := newCluster("")
cl := membership.NewCluster("")
cl.SetStore(store.New())
for i := 1; i <= 4; i++ {
cl.AddMember(&Member{ID: types.ID(i)})
cl.AddMember(&membership.Member{ID: types.ID(i)})
}
cl.RemoveMember(4)
@ -492,28 +493,28 @@ func TestApplyConfChangeError(t *testing.T) {
Type: raftpb.ConfChangeAddNode,
NodeID: 4,
},
ErrIDRemoved,
membership.ErrIDRemoved,
},
{
raftpb.ConfChange{
Type: raftpb.ConfChangeUpdateNode,
NodeID: 4,
},
ErrIDRemoved,
membership.ErrIDRemoved,
},
{
raftpb.ConfChange{
Type: raftpb.ConfChangeAddNode,
NodeID: 1,
},
ErrIDExists,
membership.ErrIDExists,
},
{
raftpb.ConfChange{
Type: raftpb.ConfChangeRemoveNode,
NodeID: 5,
},
ErrIDNotFound,
membership.ErrIDNotFound,
},
}
for i, tt := range tests {
@ -541,10 +542,10 @@ func TestApplyConfChangeError(t *testing.T) {
}
func TestApplyConfChangeShouldStop(t *testing.T) {
cl := newCluster("")
cl := membership.NewCluster("")
cl.SetStore(store.New())
for i := 1; i <= 3; i++ {
cl.AddMember(&Member{ID: types.ID(i)})
cl.AddMember(&membership.Member{ID: types.ID(i)})
}
srv := &EtcdServer{
id: 1,
@ -581,10 +582,10 @@ func TestApplyConfChangeShouldStop(t *testing.T) {
// TestApplyMultiConfChangeShouldStop ensures that apply will return shouldStop
// if the local member is removed along with other conf updates.
func TestApplyMultiConfChangeShouldStop(t *testing.T) {
cl := newCluster("")
cl := membership.NewCluster("")
cl.SetStore(store.New())
for i := 1; i <= 5; i++ {
cl.AddMember(&Member{ID: types.ID(i)})
cl.AddMember(&membership.Member{ID: types.ID(i)})
}
srv := &EtcdServer{
id: 2,
@ -922,8 +923,9 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
maxInFlightMsgSnap = 16
)
n := newNopReadyNode()
cl := newCluster("abc")
cl.SetStore(store.New())
st := store.New()
cl := membership.NewCluster("abc")
cl.SetStore(st)
testdir, err := ioutil.TempDir(os.TempDir(), "testsnapdir")
if err != nil {
@ -946,7 +948,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
storage: mockstorage.NewStorageRecorder(testdir),
raftStorage: rs,
},
store: cl.store,
store: st,
cluster: cl,
msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap),
}
@ -1032,7 +1034,7 @@ func TestAddMember(t *testing.T) {
reqIDGen: idutil.NewGenerator(0, time.Time{}),
}
s.start()
m := Member{ID: 1234, RaftAttributes: RaftAttributes{PeerURLs: []string{"foo"}}}
m := membership.Member{ID: 1234, RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"foo"}}}
err := s.AddMember(context.TODO(), m)
gaction := n.Action()
s.Stop()
@ -1058,7 +1060,7 @@ func TestRemoveMember(t *testing.T) {
cl := newTestCluster(nil)
st := store.New()
cl.SetStore(store.New())
cl.AddMember(&Member{ID: 1234})
cl.AddMember(&membership.Member{ID: 1234})
s := &EtcdServer{
r: raftNode{
Node: n,
@ -1097,7 +1099,7 @@ func TestUpdateMember(t *testing.T) {
cl := newTestCluster(nil)
st := store.New()
cl.SetStore(st)
cl.AddMember(&Member{ID: 1234})
cl.AddMember(&membership.Member{ID: 1234})
s := &EtcdServer{
r: raftNode{
Node: n,
@ -1110,7 +1112,7 @@ func TestUpdateMember(t *testing.T) {
reqIDGen: idutil.NewGenerator(0, time.Time{}),
}
s.start()
wm := Member{ID: 1234, RaftAttributes: RaftAttributes{PeerURLs: []string{"http://127.0.0.1:1"}}}
wm := membership.Member{ID: 1234, RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"http://127.0.0.1:1"}}}
err := s.UpdateMember(context.TODO(), wm)
gaction := n.Action()
s.Stop()
@ -1139,8 +1141,8 @@ func TestPublish(t *testing.T) {
cfg: &ServerConfig{TickMs: 1},
id: 1,
r: raftNode{Node: n},
attributes: Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}},
cluster: &cluster{},
attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}},
cluster: &membership.RaftCluster{},
w: w,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
}
@ -1161,11 +1163,11 @@ func TestPublish(t *testing.T) {
if r.Method != "PUT" {
t.Errorf("method = %s, want PUT", r.Method)
}
wm := Member{ID: 1, Attributes: Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}}}
if wpath := path.Join(memberStoreKey(wm.ID), attributesSuffix); r.Path != wpath {
wm := membership.Member{ID: 1, Attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}}}
if wpath := membership.MemberAttributesStorePath(wm.ID); r.Path != wpath {
t.Errorf("path = %s, want %s", r.Path, wpath)
}
var gattr Attributes
var gattr membership.Attributes
if err := json.Unmarshal([]byte(r.Val), &gattr); err != nil {
t.Fatalf("unmarshal val error: %v", err)
}
@ -1182,7 +1184,7 @@ func TestPublishStopped(t *testing.T) {
Node: newNodeNop(),
transport: rafthttp.NewNopTransporter(),
},
cluster: &cluster{},
cluster: &membership.RaftCluster{},
w: mockwait.NewNop(),
done: make(chan struct{}),
stop: make(chan struct{}),
@ -1223,8 +1225,8 @@ func TestUpdateVersion(t *testing.T) {
id: 1,
cfg: &ServerConfig{TickMs: 1},
r: raftNode{Node: n},
attributes: Attributes{Name: "node1", ClientURLs: []string{"http://node1.com"}},
cluster: &cluster{},
attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://node1.com"}},
cluster: &membership.RaftCluster{},
w: w,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
}
@ -1279,39 +1281,36 @@ func TestStopNotify(t *testing.T) {
func TestGetOtherPeerURLs(t *testing.T) {
tests := []struct {
membs []*Member
self string
membs []*membership.Member
wurls []string
}{
{
[]*Member{
newTestMember(1, []string{"http://10.0.0.1"}, "a", nil),
[]*membership.Member{
membership.NewMember("1", types.MustNewURLs([]string{"http://10.0.0.1:1"}), "a", nil),
},
"a",
[]string{},
},
{
[]*Member{
newTestMember(1, []string{"http://10.0.0.1"}, "a", nil),
newTestMember(2, []string{"http://10.0.0.2"}, "b", nil),
newTestMember(3, []string{"http://10.0.0.3"}, "c", nil),
[]*membership.Member{
membership.NewMember("1", types.MustNewURLs([]string{"http://10.0.0.1:1"}), "a", nil),
membership.NewMember("2", types.MustNewURLs([]string{"http://10.0.0.2:2"}), "a", nil),
membership.NewMember("3", types.MustNewURLs([]string{"http://10.0.0.3:3"}), "a", nil),
},
"a",
[]string{"http://10.0.0.2", "http://10.0.0.3"},
[]string{"http://10.0.0.2:2", "http://10.0.0.3:3"},
},
{
[]*Member{
newTestMember(1, []string{"http://10.0.0.1"}, "a", nil),
newTestMember(3, []string{"http://10.0.0.3"}, "c", nil),
newTestMember(2, []string{"http://10.0.0.2"}, "b", nil),
[]*membership.Member{
membership.NewMember("1", types.MustNewURLs([]string{"http://10.0.0.1:1"}), "a", nil),
membership.NewMember("3", types.MustNewURLs([]string{"http://10.0.0.3:3"}), "a", nil),
membership.NewMember("2", types.MustNewURLs([]string{"http://10.0.0.2:2"}), "a", nil),
},
"a",
[]string{"http://10.0.0.2", "http://10.0.0.3"},
[]string{"http://10.0.0.2:2", "http://10.0.0.3:3"},
},
}
for i, tt := range tests {
cl := newClusterFromMembers("", types.ID(0), tt.membs)
urls := getRemotePeerURLs(cl, tt.self)
cl := membership.NewClusterFromMembers("", types.ID(0), tt.membs)
self := "1"
urls := getRemotePeerURLs(cl, self)
if !reflect.DeepEqual(urls, tt.wurls) {
t.Errorf("#%d: urls = %+v, want %+v", i, urls, tt.wurls)
}
@ -1440,3 +1439,11 @@ func (n *nodeCommitter) Propose(ctx context.Context, data []byte) error {
}
return nil
}
func newTestCluster(membs []*membership.Member) *membership.RaftCluster {
c := membership.NewCluster("")
for _, m := range membs {
c.AddMember(m)
}
return c
}

View File

@ -17,13 +17,14 @@ package etcdserver
import (
"time"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/rafthttp"
)
// isConnectedToQuorumSince checks whether the local member is connected to the
// quorum of the cluster since the given time.
func isConnectedToQuorumSince(transport rafthttp.Transporter, since time.Time, self types.ID, members []*Member) bool {
func isConnectedToQuorumSince(transport rafthttp.Transporter, since time.Time, self types.ID, members []*membership.Member) bool {
var connectedNum int
for _, m := range members {
if m.ID == self || isConnectedSince(transport, since, m.ID) {

View File

@ -53,6 +53,14 @@ func NewURLs(strs []string) (URLs, error) {
return us, nil
}
func MustNewURLs(strs []string) URLs {
urls, err := NewURLs(strs)
if err != nil {
panic(err)
}
return urls
}
func (us URLs) String() string {
return strings.Join(us.StringSlice(), ",")
}