From e9735b7bd0d2cdee27909aee1219058f9d23f7b3 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 12 Apr 2016 11:37:22 -0700 Subject: [PATCH] etcdserver: save cluster version into backend --- etcdserver/membership/cluster.go | 8 +++++++- etcdserver/membership/store.go | 29 ++++++++++++++++++++++++++++- etcdserver/server.go | 6 ++++-- 3 files changed, 39 insertions(+), 4 deletions(-) diff --git a/etcdserver/membership/cluster.go b/etcdserver/membership/cluster.go index 6bb047235..3a6717304 100644 --- a/etcdserver/membership/cluster.go +++ b/etcdserver/membership/cluster.go @@ -197,7 +197,7 @@ func (c *RaftCluster) SetStore(st store.Store) { c.store = st } func (c *RaftCluster) SetBackend(be backend.Backend) { c.be = be - mustCreateBackendMemberBucket(c.be) + mustCreateBackendBuckets(c.be) } func (c *RaftCluster) Recover() { @@ -360,6 +360,12 @@ func (c *RaftCluster) SetVersion(ver *semver.Version) { } c.version = ver mustDetectDowngrade(c.version) + if c.store != nil { + mustSaveClusterVersionToStore(c.store, ver) + } + if c.be != nil { + mustSaveClusterVersionToBackend(c.be, ver) + } } func (c *RaftCluster) IsReadyToAddNewMember() bool { diff --git a/etcdserver/membership/store.go b/etcdserver/membership/store.go index d253fb036..5887ec99a 100644 --- a/etcdserver/membership/store.go +++ b/etcdserver/membership/store.go @@ -22,6 +22,8 @@ import ( "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/storage/backend" "github.com/coreos/etcd/store" + + "github.com/coreos/go-semver/semver" ) const ( @@ -35,6 +37,7 @@ const ( var ( membersBucketName = []byte("members") membersRemovedBuckedName = []byte("members_removed") + clusterBucketName = []byte("cluster") StoreMembersPrefix = path.Join(storePrefix, "members") storeRemovedMembersPrefix = path.Join(storePrefix, "removed_members") @@ -63,6 +66,15 @@ func mustDeleteMemberFromBackend(be backend.Backend, id types.ID) { tx.Unlock() } +func mustSaveClusterVersionToBackend(be backend.Backend, ver *semver.Version) { + ckey := backendClusterVersionKey() + + tx := be.BatchTx() + tx.Lock() + defer tx.Unlock() + tx.UnsafePut(clusterBucketName, ckey, []byte(ver.String())) +} + func mustSaveMemberToStore(s store.Store, m *Member) { b, err := json.Marshal(m.RaftAttributes) if err != nil { @@ -105,6 +117,12 @@ func mustUpdateMemberAttrInStore(s store.Store, m *Member) { } } +func mustSaveClusterVersionToStore(s store.Store, ver *semver.Version) { + if _, err := s.Set(StoreClusterVersionKey(), false, ver.String(), store.TTLOptionSet{ExpireTime: store.Permanent}); err != nil { + plog.Panicf("save cluster version should never fail: %v", err) + } +} + // nodeToMember builds member from a key value node. // the child nodes of the given node MUST be sorted by key. func nodeToMember(n *store.NodeExtern) (*Member, error) { @@ -137,18 +155,27 @@ func backendMemberKey(id types.ID) []byte { return []byte(id.String()) } -func mustCreateBackendMemberBucket(be backend.Backend) { +func backendClusterVersionKey() []byte { + return []byte("clusterVersion") +} + +func mustCreateBackendBuckets(be backend.Backend) { tx := be.BatchTx() tx.Lock() defer tx.Unlock() tx.UnsafeCreateBucket(membersBucketName) tx.UnsafeCreateBucket(membersRemovedBuckedName) + tx.UnsafeCreateBucket(clusterBucketName) } func MemberStoreKey(id types.ID) string { return path.Join(StoreMembersPrefix, id.String()) } +func StoreClusterVersionKey() string { + return path.Join(storePrefix, "version") +} + func MemberAttributesStorePath(id types.ID) string { return path.Join(MemberStoreKey(id), attributesSuffix) } diff --git a/etcdserver/server.go b/etcdserver/server.go index a16fe1fef..8b1befff3 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -1123,8 +1123,10 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response { // return an empty response since there is no consumer. return Response{} } - if r.Path == path.Join(StoreClusterPrefix, "version") { + if r.Path == membership.StoreClusterVersionKey() { s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val))) + // return an empty response since there is no consumer. + return Response{} } return f(s.store.Set(r.Path, r.Dir, r.Val, ttlOptions)) } @@ -1312,7 +1314,7 @@ func (s *EtcdServer) updateClusterVersion(ver string) { } req := pb.Request{ Method: "PUT", - Path: path.Join(StoreClusterPrefix, "version"), + Path: membership.StoreClusterVersionKey(), Val: ver, } ctx, cancel := context.WithTimeout(context.Background(), s.cfg.ReqTimeout())