mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #5050 from xiang90/b_v
etcdserver: save cluster version into backend
This commit is contained in:
commit
a8b7d0b63c
@ -197,7 +197,7 @@ func (c *RaftCluster) SetStore(st store.Store) { c.store = st }
|
|||||||
|
|
||||||
func (c *RaftCluster) SetBackend(be backend.Backend) {
|
func (c *RaftCluster) SetBackend(be backend.Backend) {
|
||||||
c.be = be
|
c.be = be
|
||||||
mustCreateBackendMemberBucket(c.be)
|
mustCreateBackendBuckets(c.be)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *RaftCluster) Recover() {
|
func (c *RaftCluster) Recover() {
|
||||||
@ -360,6 +360,12 @@ func (c *RaftCluster) SetVersion(ver *semver.Version) {
|
|||||||
}
|
}
|
||||||
c.version = ver
|
c.version = ver
|
||||||
mustDetectDowngrade(c.version)
|
mustDetectDowngrade(c.version)
|
||||||
|
if c.store != nil {
|
||||||
|
mustSaveClusterVersionToStore(c.store, ver)
|
||||||
|
}
|
||||||
|
if c.be != nil {
|
||||||
|
mustSaveClusterVersionToBackend(c.be, ver)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *RaftCluster) IsReadyToAddNewMember() bool {
|
func (c *RaftCluster) IsReadyToAddNewMember() bool {
|
||||||
|
@ -22,6 +22,8 @@ import (
|
|||||||
"github.com/coreos/etcd/pkg/types"
|
"github.com/coreos/etcd/pkg/types"
|
||||||
"github.com/coreos/etcd/storage/backend"
|
"github.com/coreos/etcd/storage/backend"
|
||||||
"github.com/coreos/etcd/store"
|
"github.com/coreos/etcd/store"
|
||||||
|
|
||||||
|
"github.com/coreos/go-semver/semver"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -35,6 +37,7 @@ const (
|
|||||||
var (
|
var (
|
||||||
membersBucketName = []byte("members")
|
membersBucketName = []byte("members")
|
||||||
membersRemovedBuckedName = []byte("members_removed")
|
membersRemovedBuckedName = []byte("members_removed")
|
||||||
|
clusterBucketName = []byte("cluster")
|
||||||
|
|
||||||
StoreMembersPrefix = path.Join(storePrefix, "members")
|
StoreMembersPrefix = path.Join(storePrefix, "members")
|
||||||
storeRemovedMembersPrefix = path.Join(storePrefix, "removed_members")
|
storeRemovedMembersPrefix = path.Join(storePrefix, "removed_members")
|
||||||
@ -63,6 +66,15 @@ func mustDeleteMemberFromBackend(be backend.Backend, id types.ID) {
|
|||||||
tx.Unlock()
|
tx.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func mustSaveClusterVersionToBackend(be backend.Backend, ver *semver.Version) {
|
||||||
|
ckey := backendClusterVersionKey()
|
||||||
|
|
||||||
|
tx := be.BatchTx()
|
||||||
|
tx.Lock()
|
||||||
|
defer tx.Unlock()
|
||||||
|
tx.UnsafePut(clusterBucketName, ckey, []byte(ver.String()))
|
||||||
|
}
|
||||||
|
|
||||||
func mustSaveMemberToStore(s store.Store, m *Member) {
|
func mustSaveMemberToStore(s store.Store, m *Member) {
|
||||||
b, err := json.Marshal(m.RaftAttributes)
|
b, err := json.Marshal(m.RaftAttributes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -105,6 +117,12 @@ func mustUpdateMemberAttrInStore(s store.Store, m *Member) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func mustSaveClusterVersionToStore(s store.Store, ver *semver.Version) {
|
||||||
|
if _, err := s.Set(StoreClusterVersionKey(), false, ver.String(), store.TTLOptionSet{ExpireTime: store.Permanent}); err != nil {
|
||||||
|
plog.Panicf("save cluster version should never fail: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// nodeToMember builds member from a key value node.
|
// nodeToMember builds member from a key value node.
|
||||||
// the child nodes of the given node MUST be sorted by key.
|
// the child nodes of the given node MUST be sorted by key.
|
||||||
func nodeToMember(n *store.NodeExtern) (*Member, error) {
|
func nodeToMember(n *store.NodeExtern) (*Member, error) {
|
||||||
@ -137,18 +155,27 @@ func backendMemberKey(id types.ID) []byte {
|
|||||||
return []byte(id.String())
|
return []byte(id.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
func mustCreateBackendMemberBucket(be backend.Backend) {
|
func backendClusterVersionKey() []byte {
|
||||||
|
return []byte("clusterVersion")
|
||||||
|
}
|
||||||
|
|
||||||
|
func mustCreateBackendBuckets(be backend.Backend) {
|
||||||
tx := be.BatchTx()
|
tx := be.BatchTx()
|
||||||
tx.Lock()
|
tx.Lock()
|
||||||
defer tx.Unlock()
|
defer tx.Unlock()
|
||||||
tx.UnsafeCreateBucket(membersBucketName)
|
tx.UnsafeCreateBucket(membersBucketName)
|
||||||
tx.UnsafeCreateBucket(membersRemovedBuckedName)
|
tx.UnsafeCreateBucket(membersRemovedBuckedName)
|
||||||
|
tx.UnsafeCreateBucket(clusterBucketName)
|
||||||
}
|
}
|
||||||
|
|
||||||
func MemberStoreKey(id types.ID) string {
|
func MemberStoreKey(id types.ID) string {
|
||||||
return path.Join(StoreMembersPrefix, id.String())
|
return path.Join(StoreMembersPrefix, id.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func StoreClusterVersionKey() string {
|
||||||
|
return path.Join(storePrefix, "version")
|
||||||
|
}
|
||||||
|
|
||||||
func MemberAttributesStorePath(id types.ID) string {
|
func MemberAttributesStorePath(id types.ID) string {
|
||||||
return path.Join(MemberStoreKey(id), attributesSuffix)
|
return path.Join(MemberStoreKey(id), attributesSuffix)
|
||||||
}
|
}
|
||||||
|
@ -1123,8 +1123,10 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response {
|
|||||||
// return an empty response since there is no consumer.
|
// return an empty response since there is no consumer.
|
||||||
return Response{}
|
return Response{}
|
||||||
}
|
}
|
||||||
if r.Path == path.Join(StoreClusterPrefix, "version") {
|
if r.Path == membership.StoreClusterVersionKey() {
|
||||||
s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)))
|
s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)))
|
||||||
|
// return an empty response since there is no consumer.
|
||||||
|
return Response{}
|
||||||
}
|
}
|
||||||
return f(s.store.Set(r.Path, r.Dir, r.Val, ttlOptions))
|
return f(s.store.Set(r.Path, r.Dir, r.Val, ttlOptions))
|
||||||
}
|
}
|
||||||
@ -1312,7 +1314,7 @@ func (s *EtcdServer) updateClusterVersion(ver string) {
|
|||||||
}
|
}
|
||||||
req := pb.Request{
|
req := pb.Request{
|
||||||
Method: "PUT",
|
Method: "PUT",
|
||||||
Path: path.Join(StoreClusterPrefix, "version"),
|
Path: membership.StoreClusterVersionKey(),
|
||||||
Val: ver,
|
Val: ver,
|
||||||
}
|
}
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), s.cfg.ReqTimeout())
|
ctx, cancel := context.WithTimeout(context.Background(), s.cfg.ReqTimeout())
|
||||||
|
Loading…
x
Reference in New Issue
Block a user