mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #11801 from YoyinZyc/downgrade-server
[Etcd downgrade] Implement downgrade validate, enable and cancel
This commit is contained in:
commit
5e2815e9cd
@ -76,6 +76,12 @@ var (
|
|||||||
ErrGPRCNotSupportedForLearner = status.New(codes.Unavailable, "etcdserver: rpc not supported for learner").Err()
|
ErrGPRCNotSupportedForLearner = status.New(codes.Unavailable, "etcdserver: rpc not supported for learner").Err()
|
||||||
ErrGRPCBadLeaderTransferee = status.New(codes.FailedPrecondition, "etcdserver: bad leader transferee").Err()
|
ErrGRPCBadLeaderTransferee = status.New(codes.FailedPrecondition, "etcdserver: bad leader transferee").Err()
|
||||||
|
|
||||||
|
ErrGRPCClusterVersionUnavailable = status.New(codes.Unavailable, "etcdserver: cluster version not found during downgrade").Err()
|
||||||
|
ErrGRPCWrongDowngradeVersionFormat = status.New(codes.InvalidArgument, "etcdserver: wrong downgrade target version format").Err()
|
||||||
|
ErrGRPCInvalidDowngradeTargetVersion = status.New(codes.InvalidArgument, "etcdserver: invalid downgrade target version").Err()
|
||||||
|
ErrGRPCDowngradeInProcess = status.New(codes.FailedPrecondition, "etcdserver: cluster has a downgrade job in progress").Err()
|
||||||
|
ErrGRPCNoInflightDowngrade = status.New(codes.FailedPrecondition, "etcdserver: no inflight downgrade job").Err()
|
||||||
|
|
||||||
errStringToError = map[string]error{
|
errStringToError = map[string]error{
|
||||||
ErrorDesc(ErrGRPCEmptyKey): ErrGRPCEmptyKey,
|
ErrorDesc(ErrGRPCEmptyKey): ErrGRPCEmptyKey,
|
||||||
ErrorDesc(ErrGRPCKeyNotFound): ErrGRPCKeyNotFound,
|
ErrorDesc(ErrGRPCKeyNotFound): ErrGRPCKeyNotFound,
|
||||||
@ -132,6 +138,12 @@ var (
|
|||||||
ErrorDesc(ErrGRPCCorrupt): ErrGRPCCorrupt,
|
ErrorDesc(ErrGRPCCorrupt): ErrGRPCCorrupt,
|
||||||
ErrorDesc(ErrGPRCNotSupportedForLearner): ErrGPRCNotSupportedForLearner,
|
ErrorDesc(ErrGPRCNotSupportedForLearner): ErrGPRCNotSupportedForLearner,
|
||||||
ErrorDesc(ErrGRPCBadLeaderTransferee): ErrGRPCBadLeaderTransferee,
|
ErrorDesc(ErrGRPCBadLeaderTransferee): ErrGRPCBadLeaderTransferee,
|
||||||
|
|
||||||
|
ErrorDesc(ErrGRPCClusterVersionUnavailable): ErrGRPCClusterVersionUnavailable,
|
||||||
|
ErrorDesc(ErrGRPCWrongDowngradeVersionFormat): ErrGRPCWrongDowngradeVersionFormat,
|
||||||
|
ErrorDesc(ErrGRPCInvalidDowngradeTargetVersion): ErrGRPCInvalidDowngradeTargetVersion,
|
||||||
|
ErrorDesc(ErrGRPCDowngradeInProcess): ErrGRPCDowngradeInProcess,
|
||||||
|
ErrorDesc(ErrGRPCNoInflightDowngrade): ErrGRPCNoInflightDowngrade,
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -190,6 +202,12 @@ var (
|
|||||||
ErrUnhealthy = Error(ErrGRPCUnhealthy)
|
ErrUnhealthy = Error(ErrGRPCUnhealthy)
|
||||||
ErrCorrupt = Error(ErrGRPCCorrupt)
|
ErrCorrupt = Error(ErrGRPCCorrupt)
|
||||||
ErrBadLeaderTransferee = Error(ErrGRPCBadLeaderTransferee)
|
ErrBadLeaderTransferee = Error(ErrGRPCBadLeaderTransferee)
|
||||||
|
|
||||||
|
ErrClusterVersionUnavailable = Error(ErrGRPCClusterVersionUnavailable)
|
||||||
|
ErrWrongDowngradeVersionFormat = Error(ErrGRPCWrongDowngradeVersionFormat)
|
||||||
|
ErrInvalidDowngradeTargetVersion = Error(ErrGRPCInvalidDowngradeTargetVersion)
|
||||||
|
ErrDowngradeInProcess = Error(ErrGRPCDowngradeInProcess)
|
||||||
|
ErrNoInflightDowngrade = Error(ErrGRPCNoInflightDowngrade)
|
||||||
)
|
)
|
||||||
|
|
||||||
// EtcdError defines gRPC server errors.
|
// EtcdError defines gRPC server errors.
|
||||||
|
@ -58,6 +58,12 @@ var toGRPCErrorMap = map[error]error{
|
|||||||
etcdserver.ErrCorrupt: rpctypes.ErrGRPCCorrupt,
|
etcdserver.ErrCorrupt: rpctypes.ErrGRPCCorrupt,
|
||||||
etcdserver.ErrBadLeaderTransferee: rpctypes.ErrGRPCBadLeaderTransferee,
|
etcdserver.ErrBadLeaderTransferee: rpctypes.ErrGRPCBadLeaderTransferee,
|
||||||
|
|
||||||
|
etcdserver.ErrClusterVersionUnavailable: rpctypes.ErrGRPCClusterVersionUnavailable,
|
||||||
|
etcdserver.ErrWrongDowngradeVersionFormat: rpctypes.ErrGRPCWrongDowngradeVersionFormat,
|
||||||
|
etcdserver.ErrInvalidDowngradeTargetVersion: rpctypes.ErrGRPCInvalidDowngradeTargetVersion,
|
||||||
|
etcdserver.ErrDowngradeInProcess: rpctypes.ErrGRPCDowngradeInProcess,
|
||||||
|
etcdserver.ErrNoInflightDowngrade: rpctypes.ErrGRPCNoInflightDowngrade,
|
||||||
|
|
||||||
lease.ErrLeaseNotFound: rpctypes.ErrGRPCLeaseNotFound,
|
lease.ErrLeaseNotFound: rpctypes.ErrGRPCLeaseNotFound,
|
||||||
lease.ErrLeaseExists: rpctypes.ErrGRPCLeaseExist,
|
lease.ErrLeaseExists: rpctypes.ErrGRPCLeaseExist,
|
||||||
lease.ErrLeaseTTLTooLarge: rpctypes.ErrGRPCLeaseTTLTooLarge,
|
lease.ErrLeaseTTLTooLarge: rpctypes.ErrGRPCLeaseTTLTooLarge,
|
||||||
|
@ -55,6 +55,7 @@ type applyResult struct {
|
|||||||
type applierV3Internal interface {
|
type applierV3Internal interface {
|
||||||
ClusterVersionSet(r *membershippb.ClusterVersionSetRequest)
|
ClusterVersionSet(r *membershippb.ClusterVersionSetRequest)
|
||||||
ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest)
|
ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest)
|
||||||
|
DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest)
|
||||||
}
|
}
|
||||||
|
|
||||||
// applierV3 is the interface for processing V3 raft messages
|
// applierV3 is the interface for processing V3 raft messages
|
||||||
@ -195,6 +196,8 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
|
|||||||
a.s.applyV3Internal.ClusterVersionSet(r.ClusterVersionSet)
|
a.s.applyV3Internal.ClusterVersionSet(r.ClusterVersionSet)
|
||||||
case r.ClusterMemberAttrSet != nil:
|
case r.ClusterMemberAttrSet != nil:
|
||||||
a.s.applyV3Internal.ClusterMemberAttrSet(r.ClusterMemberAttrSet)
|
a.s.applyV3Internal.ClusterMemberAttrSet(r.ClusterMemberAttrSet)
|
||||||
|
case r.DowngradeInfoSet != nil:
|
||||||
|
a.s.applyV3Internal.DowngradeInfoSet(r.DowngradeInfoSet)
|
||||||
default:
|
default:
|
||||||
panic("not implemented")
|
panic("not implemented")
|
||||||
}
|
}
|
||||||
@ -882,6 +885,14 @@ func (a *applierV3backend) ClusterMemberAttrSet(r *membershippb.ClusterMemberAtt
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *applierV3backend) DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest) {
|
||||||
|
d := membership.DowngradeInfo{Enabled: false}
|
||||||
|
if r.Enabled {
|
||||||
|
d = membership.DowngradeInfo{Enabled: true, TargetVersion: r.Ver}
|
||||||
|
}
|
||||||
|
a.s.cluster.SetDowngradeInfo(&d)
|
||||||
|
}
|
||||||
|
|
||||||
type quotaApplierV3 struct {
|
type quotaApplierV3 struct {
|
||||||
applierV3
|
applierV3
|
||||||
q Quota
|
q Quota
|
||||||
|
@ -355,3 +355,22 @@ func promoteMemberHTTP(ctx context.Context, url string, id uint64, peerRt http.R
|
|||||||
}
|
}
|
||||||
return membs, nil
|
return membs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func convertToClusterVersion(v string) (*semver.Version, error) {
|
||||||
|
ver, err := semver.NewVersion(v)
|
||||||
|
if err != nil {
|
||||||
|
// allow input version format Major.Minor
|
||||||
|
ver, err = semver.NewVersion(v + ".0")
|
||||||
|
if err != nil {
|
||||||
|
return nil, ErrWrongDowngradeVersionFormat
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// cluster version only keeps major.minor, remove patch version
|
||||||
|
ver = &semver.Version{Major: ver.Major, Minor: ver.Minor}
|
||||||
|
return ver, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Todo: handle the case that downgrading from higher major version(e.g. downgrade from v4.0 to v3.x)
|
||||||
|
func allowedDowngradeVersion(ver *semver.Version) *semver.Version {
|
||||||
|
return &semver.Version{Major: ver.Major, Minor: ver.Minor - 1}
|
||||||
|
}
|
||||||
|
@ -133,3 +133,46 @@ func TestIsCompatibleWithVers(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestConvertToClusterVersion(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
inputVerStr string
|
||||||
|
expectedVer string
|
||||||
|
hasError bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
"Succeeded: Major.Minor.Patch",
|
||||||
|
"3.4.2",
|
||||||
|
"3.4.0",
|
||||||
|
false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"Succeeded: Major.Minor",
|
||||||
|
"3.4",
|
||||||
|
"3.4.0",
|
||||||
|
false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"Failed: wrong version format",
|
||||||
|
"3*.9",
|
||||||
|
"",
|
||||||
|
true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
ver, err := convertToClusterVersion(tt.inputVerStr)
|
||||||
|
hasError := err != nil
|
||||||
|
if hasError != tt.hasError {
|
||||||
|
t.Errorf("Expected error status is %v; Got %v", tt.hasError, err)
|
||||||
|
}
|
||||||
|
if tt.hasError {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if ver == nil || tt.expectedVer != ver.String() {
|
||||||
|
t.Errorf("Expected output cluster version is %v; Got %v", tt.expectedVer, ver)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -20,25 +20,30 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
ErrUnknownMethod = errors.New("etcdserver: unknown method")
|
ErrUnknownMethod = errors.New("etcdserver: unknown method")
|
||||||
ErrStopped = errors.New("etcdserver: server stopped")
|
ErrStopped = errors.New("etcdserver: server stopped")
|
||||||
ErrCanceled = errors.New("etcdserver: request cancelled")
|
ErrCanceled = errors.New("etcdserver: request cancelled")
|
||||||
ErrTimeout = errors.New("etcdserver: request timed out")
|
ErrTimeout = errors.New("etcdserver: request timed out")
|
||||||
ErrTimeoutDueToLeaderFail = errors.New("etcdserver: request timed out, possibly due to previous leader failure")
|
ErrTimeoutDueToLeaderFail = errors.New("etcdserver: request timed out, possibly due to previous leader failure")
|
||||||
ErrTimeoutDueToConnectionLost = errors.New("etcdserver: request timed out, possibly due to connection lost")
|
ErrTimeoutDueToConnectionLost = errors.New("etcdserver: request timed out, possibly due to connection lost")
|
||||||
ErrTimeoutLeaderTransfer = errors.New("etcdserver: request timed out, leader transfer took too long")
|
ErrTimeoutLeaderTransfer = errors.New("etcdserver: request timed out, leader transfer took too long")
|
||||||
ErrLeaderChanged = errors.New("etcdserver: leader changed")
|
ErrLeaderChanged = errors.New("etcdserver: leader changed")
|
||||||
ErrNotEnoughStartedMembers = errors.New("etcdserver: re-configuration failed due to not enough started members")
|
ErrNotEnoughStartedMembers = errors.New("etcdserver: re-configuration failed due to not enough started members")
|
||||||
ErrLearnerNotReady = errors.New("etcdserver: can only promote a learner member which is in sync with leader")
|
ErrLearnerNotReady = errors.New("etcdserver: can only promote a learner member which is in sync with leader")
|
||||||
ErrNoLeader = errors.New("etcdserver: no leader")
|
ErrNoLeader = errors.New("etcdserver: no leader")
|
||||||
ErrNotLeader = errors.New("etcdserver: not leader")
|
ErrNotLeader = errors.New("etcdserver: not leader")
|
||||||
ErrRequestTooLarge = errors.New("etcdserver: request is too large")
|
ErrRequestTooLarge = errors.New("etcdserver: request is too large")
|
||||||
ErrNoSpace = errors.New("etcdserver: no space")
|
ErrNoSpace = errors.New("etcdserver: no space")
|
||||||
ErrTooManyRequests = errors.New("etcdserver: too many requests")
|
ErrTooManyRequests = errors.New("etcdserver: too many requests")
|
||||||
ErrUnhealthy = errors.New("etcdserver: unhealthy cluster")
|
ErrUnhealthy = errors.New("etcdserver: unhealthy cluster")
|
||||||
ErrKeyNotFound = errors.New("etcdserver: key not found")
|
ErrKeyNotFound = errors.New("etcdserver: key not found")
|
||||||
ErrCorrupt = errors.New("etcdserver: corrupt cluster")
|
ErrCorrupt = errors.New("etcdserver: corrupt cluster")
|
||||||
ErrBadLeaderTransferee = errors.New("etcdserver: bad leader transferee")
|
ErrBadLeaderTransferee = errors.New("etcdserver: bad leader transferee")
|
||||||
|
ErrClusterVersionUnavailable = errors.New("etcdserver: cluster version not found during downgrade")
|
||||||
|
ErrWrongDowngradeVersionFormat = errors.New("etcdserver: wrong downgrade target version format")
|
||||||
|
ErrInvalidDowngradeTargetVersion = errors.New("etcdserver: invalid downgrade target version")
|
||||||
|
ErrDowngradeInProcess = errors.New("etcdserver: cluster has a downgrade job in progress")
|
||||||
|
ErrNoInflightDowngrade = errors.New("etcdserver: no inflight downgrade job")
|
||||||
)
|
)
|
||||||
|
|
||||||
type DiscoveryError struct {
|
type DiscoveryError struct {
|
||||||
|
@ -22,6 +22,7 @@ import (
|
|||||||
|
|
||||||
"go.etcd.io/etcd/v3/auth"
|
"go.etcd.io/etcd/v3/auth"
|
||||||
"go.etcd.io/etcd/v3/etcdserver/api/membership"
|
"go.etcd.io/etcd/v3/etcdserver/api/membership"
|
||||||
|
"go.etcd.io/etcd/v3/etcdserver/api/membership/membershippb"
|
||||||
pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb"
|
pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb"
|
||||||
"go.etcd.io/etcd/v3/lease"
|
"go.etcd.io/etcd/v3/lease"
|
||||||
"go.etcd.io/etcd/v3/lease/leasehttp"
|
"go.etcd.io/etcd/v3/lease/leasehttp"
|
||||||
@ -806,5 +807,93 @@ func (s *EtcdServer) AuthInfoFromCtx(ctx context.Context) (*auth.AuthInfo, error
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *EtcdServer) Downgrade(ctx context.Context, r *pb.DowngradeRequest) (*pb.DowngradeResponse, error) {
|
func (s *EtcdServer) Downgrade(ctx context.Context, r *pb.DowngradeRequest) (*pb.DowngradeResponse, error) {
|
||||||
return nil, nil
|
switch r.Action {
|
||||||
|
case pb.DowngradeRequest_VALIDATE:
|
||||||
|
return s.downgradeValidate(ctx, r.Version)
|
||||||
|
case pb.DowngradeRequest_ENABLE:
|
||||||
|
return s.downgradeEnable(ctx, r)
|
||||||
|
case pb.DowngradeRequest_CANCEL:
|
||||||
|
return s.downgradeCancel(ctx)
|
||||||
|
default:
|
||||||
|
return nil, ErrUnknownMethod
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *EtcdServer) downgradeValidate(ctx context.Context, v string) (*pb.DowngradeResponse, error) {
|
||||||
|
resp := &pb.DowngradeResponse{}
|
||||||
|
|
||||||
|
targetVersion, err := convertToClusterVersion(v)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// gets leaders commit index and wait for local store to finish applying that index
|
||||||
|
// to avoid using stale downgrade information
|
||||||
|
err = s.linearizableReadNotify(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
cv := s.ClusterVersion()
|
||||||
|
if cv == nil {
|
||||||
|
return nil, ErrClusterVersionUnavailable
|
||||||
|
}
|
||||||
|
resp.Version = cv.String()
|
||||||
|
|
||||||
|
allowedTargetVersion := allowedDowngradeVersion(cv)
|
||||||
|
if !targetVersion.Equal(*allowedTargetVersion) {
|
||||||
|
return nil, ErrInvalidDowngradeTargetVersion
|
||||||
|
}
|
||||||
|
|
||||||
|
downgradeInfo := s.cluster.DowngradeInfo()
|
||||||
|
if downgradeInfo.Enabled {
|
||||||
|
// Todo: return the downgrade status along with the error msg
|
||||||
|
return nil, ErrDowngradeInProcess
|
||||||
|
}
|
||||||
|
return resp, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *EtcdServer) downgradeEnable(ctx context.Context, r *pb.DowngradeRequest) (*pb.DowngradeResponse, error) {
|
||||||
|
// validate downgrade capability before starting downgrade
|
||||||
|
v := r.Version
|
||||||
|
lg := s.getLogger()
|
||||||
|
if resp, err := s.downgradeValidate(ctx, v); err != nil {
|
||||||
|
lg.Warn("reject downgrade request", zap.Error(err))
|
||||||
|
return resp, err
|
||||||
|
}
|
||||||
|
targetVersion, err := convertToClusterVersion(v)
|
||||||
|
if err != nil {
|
||||||
|
lg.Warn("reject downgrade request", zap.Error(err))
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
raftRequest := membershippb.DowngradeInfoSetRequest{Enabled: true, Ver: targetVersion.String()}
|
||||||
|
_, err = s.raftRequest(ctx, pb.InternalRaftRequest{DowngradeInfoSet: &raftRequest})
|
||||||
|
if err != nil {
|
||||||
|
lg.Warn("reject downgrade request", zap.Error(err))
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
resp := pb.DowngradeResponse{Version: s.ClusterVersion().String()}
|
||||||
|
return &resp, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *EtcdServer) downgradeCancel(ctx context.Context) (*pb.DowngradeResponse, error) {
|
||||||
|
// gets leaders commit index and wait for local store to finish applying that index
|
||||||
|
// to avoid using stale downgrade information
|
||||||
|
if err := s.linearizableReadNotify(ctx); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
downgradeInfo := s.cluster.DowngradeInfo()
|
||||||
|
if !downgradeInfo.Enabled {
|
||||||
|
return nil, ErrNoInflightDowngrade
|
||||||
|
}
|
||||||
|
|
||||||
|
raftRequest := membershippb.DowngradeInfoSetRequest{Enabled: false}
|
||||||
|
_, err := s.raftRequest(ctx, pb.InternalRaftRequest{DowngradeInfoSet: &raftRequest})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
resp := pb.DowngradeResponse{Version: s.ClusterVersion().String()}
|
||||||
|
return &resp, nil
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user