mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
etcdserver: handle and apply downgrade validate, enable and cancel requests
This commit is contained in:
parent
37e598a20b
commit
d230e6ba8c
@ -55,6 +55,7 @@ type applyResult struct {
|
||||
type applierV3Internal interface {
|
||||
ClusterVersionSet(r *membershippb.ClusterVersionSetRequest)
|
||||
ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest)
|
||||
DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest)
|
||||
}
|
||||
|
||||
// applierV3 is the interface for processing V3 raft messages
|
||||
@ -195,6 +196,8 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
|
||||
a.s.applyV3Internal.ClusterVersionSet(r.ClusterVersionSet)
|
||||
case r.ClusterMemberAttrSet != nil:
|
||||
a.s.applyV3Internal.ClusterMemberAttrSet(r.ClusterMemberAttrSet)
|
||||
case r.DowngradeInfoSet != nil:
|
||||
a.s.applyV3Internal.DowngradeInfoSet(r.DowngradeInfoSet)
|
||||
default:
|
||||
panic("not implemented")
|
||||
}
|
||||
@ -882,6 +885,14 @@ func (a *applierV3backend) ClusterMemberAttrSet(r *membershippb.ClusterMemberAtt
|
||||
)
|
||||
}
|
||||
|
||||
func (a *applierV3backend) DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest) {
|
||||
d := membership.DowngradeInfo{Enabled: false}
|
||||
if r.Enabled {
|
||||
d = membership.DowngradeInfo{Enabled: true, TargetVersion: r.Ver}
|
||||
}
|
||||
a.s.cluster.SetDowngradeInfo(&d)
|
||||
}
|
||||
|
||||
type quotaApplierV3 struct {
|
||||
applierV3
|
||||
q Quota
|
||||
|
@ -355,3 +355,22 @@ func promoteMemberHTTP(ctx context.Context, url string, id uint64, peerRt http.R
|
||||
}
|
||||
return membs, nil
|
||||
}
|
||||
|
||||
func convertToClusterVersion(v string) (*semver.Version, error) {
|
||||
ver, err := semver.NewVersion(v)
|
||||
if err != nil {
|
||||
// allow input version format Major.Minor
|
||||
ver, err = semver.NewVersion(v + ".0")
|
||||
if err != nil {
|
||||
return nil, ErrWrongDowngradeVersionFormat
|
||||
}
|
||||
}
|
||||
// cluster version only keeps major.minor, remove patch version
|
||||
ver = &semver.Version{Major: ver.Major, Minor: ver.Minor}
|
||||
return ver, nil
|
||||
}
|
||||
|
||||
// Todo: handle the case that downgrading from higher major version(e.g. downgrade from v4.0 to v3.x)
|
||||
func allowedDowngradeVersion(ver *semver.Version) *semver.Version {
|
||||
return &semver.Version{Major: ver.Major, Minor: ver.Minor - 1}
|
||||
}
|
||||
|
@ -133,3 +133,46 @@ func TestIsCompatibleWithVers(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestConvertToClusterVersion(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
inputVerStr string
|
||||
expectedVer string
|
||||
hasError bool
|
||||
}{
|
||||
{
|
||||
"Succeeded: Major.Minor.Patch",
|
||||
"3.4.2",
|
||||
"3.4.0",
|
||||
false,
|
||||
},
|
||||
{
|
||||
"Succeeded: Major.Minor",
|
||||
"3.4",
|
||||
"3.4.0",
|
||||
false,
|
||||
},
|
||||
{
|
||||
"Failed: wrong version format",
|
||||
"3*.9",
|
||||
"",
|
||||
true,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
ver, err := convertToClusterVersion(tt.inputVerStr)
|
||||
hasError := err != nil
|
||||
if hasError != tt.hasError {
|
||||
t.Errorf("Expected error status is %v; Got %v", tt.hasError, err)
|
||||
}
|
||||
if tt.hasError {
|
||||
return
|
||||
}
|
||||
if ver == nil || tt.expectedVer != ver.String() {
|
||||
t.Errorf("Expected output cluster version is %v; Got %v", tt.expectedVer, ver)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -22,6 +22,7 @@ import (
|
||||
|
||||
"go.etcd.io/etcd/v3/auth"
|
||||
"go.etcd.io/etcd/v3/etcdserver/api/membership"
|
||||
"go.etcd.io/etcd/v3/etcdserver/api/membership/membershippb"
|
||||
pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb"
|
||||
"go.etcd.io/etcd/v3/lease"
|
||||
"go.etcd.io/etcd/v3/lease/leasehttp"
|
||||
@ -806,5 +807,93 @@ func (s *EtcdServer) AuthInfoFromCtx(ctx context.Context) (*auth.AuthInfo, error
|
||||
}
|
||||
|
||||
func (s *EtcdServer) Downgrade(ctx context.Context, r *pb.DowngradeRequest) (*pb.DowngradeResponse, error) {
|
||||
return nil, nil
|
||||
switch r.Action {
|
||||
case pb.DowngradeRequest_VALIDATE:
|
||||
return s.downgradeValidate(ctx, r.Version)
|
||||
case pb.DowngradeRequest_ENABLE:
|
||||
return s.downgradeEnable(ctx, r)
|
||||
case pb.DowngradeRequest_CANCEL:
|
||||
return s.downgradeCancel(ctx)
|
||||
default:
|
||||
return nil, ErrUnknownMethod
|
||||
}
|
||||
}
|
||||
|
||||
func (s *EtcdServer) downgradeValidate(ctx context.Context, v string) (*pb.DowngradeResponse, error) {
|
||||
resp := &pb.DowngradeResponse{}
|
||||
|
||||
targetVersion, err := convertToClusterVersion(v)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// gets leaders commit index and wait for local store to finish applying that index
|
||||
// to avoid using stale downgrade information
|
||||
err = s.linearizableReadNotify(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cv := s.ClusterVersion()
|
||||
if cv == nil {
|
||||
return nil, ErrClusterVersionUnavailable
|
||||
}
|
||||
resp.Version = cv.String()
|
||||
|
||||
allowedTargetVersion := allowedDowngradeVersion(cv)
|
||||
if !targetVersion.Equal(*allowedTargetVersion) {
|
||||
return nil, ErrInvalidDowngradeTargetVersion
|
||||
}
|
||||
|
||||
downgradeInfo := s.cluster.DowngradeInfo()
|
||||
if downgradeInfo.Enabled {
|
||||
// Todo: return the downgrade status along with the error msg
|
||||
return nil, ErrDowngradeInProcess
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (s *EtcdServer) downgradeEnable(ctx context.Context, r *pb.DowngradeRequest) (*pb.DowngradeResponse, error) {
|
||||
// validate downgrade capability before starting downgrade
|
||||
v := r.Version
|
||||
lg := s.getLogger()
|
||||
if resp, err := s.downgradeValidate(ctx, v); err != nil {
|
||||
lg.Warn("reject downgrade request", zap.Error(err))
|
||||
return resp, err
|
||||
}
|
||||
targetVersion, err := convertToClusterVersion(v)
|
||||
if err != nil {
|
||||
lg.Warn("reject downgrade request", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
raftRequest := membershippb.DowngradeInfoSetRequest{Enabled: true, Ver: targetVersion.String()}
|
||||
_, err = s.raftRequest(ctx, pb.InternalRaftRequest{DowngradeInfoSet: &raftRequest})
|
||||
if err != nil {
|
||||
lg.Warn("reject downgrade request", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
resp := pb.DowngradeResponse{Version: s.ClusterVersion().String()}
|
||||
return &resp, nil
|
||||
}
|
||||
|
||||
func (s *EtcdServer) downgradeCancel(ctx context.Context) (*pb.DowngradeResponse, error) {
|
||||
// gets leaders commit index and wait for local store to finish applying that index
|
||||
// to avoid using stale downgrade information
|
||||
if err := s.linearizableReadNotify(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
downgradeInfo := s.cluster.DowngradeInfo()
|
||||
if !downgradeInfo.Enabled {
|
||||
return nil, ErrNoInflightDowngrade
|
||||
}
|
||||
|
||||
raftRequest := membershippb.DowngradeInfoSetRequest{Enabled: false}
|
||||
_, err := s.raftRequest(ctx, pb.InternalRaftRequest{DowngradeInfoSet: &raftRequest})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resp := pb.DowngradeResponse{Version: s.ClusterVersion().String()}
|
||||
return &resp, nil
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user