diff --git a/etcdserver/apply.go b/etcdserver/apply.go index b85dd50d6..0135f8bcd 100644 --- a/etcdserver/apply.go +++ b/etcdserver/apply.go @@ -55,6 +55,7 @@ type applyResult struct { type applierV3Internal interface { ClusterVersionSet(r *membershippb.ClusterVersionSetRequest) ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest) + DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest) } // applierV3 is the interface for processing V3 raft messages @@ -195,6 +196,8 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult { a.s.applyV3Internal.ClusterVersionSet(r.ClusterVersionSet) case r.ClusterMemberAttrSet != nil: a.s.applyV3Internal.ClusterMemberAttrSet(r.ClusterMemberAttrSet) + case r.DowngradeInfoSet != nil: + a.s.applyV3Internal.DowngradeInfoSet(r.DowngradeInfoSet) default: panic("not implemented") } @@ -882,6 +885,14 @@ func (a *applierV3backend) ClusterMemberAttrSet(r *membershippb.ClusterMemberAtt ) } +func (a *applierV3backend) DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest) { + d := membership.DowngradeInfo{Enabled: false} + if r.Enabled { + d = membership.DowngradeInfo{Enabled: true, TargetVersion: r.Ver} + } + a.s.cluster.SetDowngradeInfo(&d) +} + type quotaApplierV3 struct { applierV3 q Quota diff --git a/etcdserver/cluster_util.go b/etcdserver/cluster_util.go index 37351519c..7990a31d5 100644 --- a/etcdserver/cluster_util.go +++ b/etcdserver/cluster_util.go @@ -355,3 +355,22 @@ func promoteMemberHTTP(ctx context.Context, url string, id uint64, peerRt http.R } return membs, nil } + +func convertToClusterVersion(v string) (*semver.Version, error) { + ver, err := semver.NewVersion(v) + if err != nil { + // allow input version format Major.Minor + ver, err = semver.NewVersion(v + ".0") + if err != nil { + return nil, ErrWrongDowngradeVersionFormat + } + } + // cluster version only keeps major.minor, remove patch version + ver = &semver.Version{Major: ver.Major, Minor: ver.Minor} + return ver, nil +} + +// Todo: handle the case that downgrading from higher major version(e.g. downgrade from v4.0 to v3.x) +func allowedDowngradeVersion(ver *semver.Version) *semver.Version { + return &semver.Version{Major: ver.Major, Minor: ver.Minor - 1} +} diff --git a/etcdserver/cluster_util_test.go b/etcdserver/cluster_util_test.go index de90d6039..04b9925ff 100644 --- a/etcdserver/cluster_util_test.go +++ b/etcdserver/cluster_util_test.go @@ -133,3 +133,46 @@ func TestIsCompatibleWithVers(t *testing.T) { } } } + +func TestConvertToClusterVersion(t *testing.T) { + tests := []struct { + name string + inputVerStr string + expectedVer string + hasError bool + }{ + { + "Succeeded: Major.Minor.Patch", + "3.4.2", + "3.4.0", + false, + }, + { + "Succeeded: Major.Minor", + "3.4", + "3.4.0", + false, + }, + { + "Failed: wrong version format", + "3*.9", + "", + true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ver, err := convertToClusterVersion(tt.inputVerStr) + hasError := err != nil + if hasError != tt.hasError { + t.Errorf("Expected error status is %v; Got %v", tt.hasError, err) + } + if tt.hasError { + return + } + if ver == nil || tt.expectedVer != ver.String() { + t.Errorf("Expected output cluster version is %v; Got %v", tt.expectedVer, ver) + } + }) + } +} diff --git a/etcdserver/v3_server.go b/etcdserver/v3_server.go index 75df7c481..92d8fb534 100644 --- a/etcdserver/v3_server.go +++ b/etcdserver/v3_server.go @@ -22,6 +22,7 @@ import ( "go.etcd.io/etcd/v3/auth" "go.etcd.io/etcd/v3/etcdserver/api/membership" + "go.etcd.io/etcd/v3/etcdserver/api/membership/membershippb" pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb" "go.etcd.io/etcd/v3/lease" "go.etcd.io/etcd/v3/lease/leasehttp" @@ -806,5 +807,93 @@ func (s *EtcdServer) AuthInfoFromCtx(ctx context.Context) (*auth.AuthInfo, error } func (s *EtcdServer) Downgrade(ctx context.Context, r *pb.DowngradeRequest) (*pb.DowngradeResponse, error) { - return nil, nil + switch r.Action { + case pb.DowngradeRequest_VALIDATE: + return s.downgradeValidate(ctx, r.Version) + case pb.DowngradeRequest_ENABLE: + return s.downgradeEnable(ctx, r) + case pb.DowngradeRequest_CANCEL: + return s.downgradeCancel(ctx) + default: + return nil, ErrUnknownMethod + } +} + +func (s *EtcdServer) downgradeValidate(ctx context.Context, v string) (*pb.DowngradeResponse, error) { + resp := &pb.DowngradeResponse{} + + targetVersion, err := convertToClusterVersion(v) + if err != nil { + return nil, err + } + + // gets leaders commit index and wait for local store to finish applying that index + // to avoid using stale downgrade information + err = s.linearizableReadNotify(ctx) + if err != nil { + return nil, err + } + + cv := s.ClusterVersion() + if cv == nil { + return nil, ErrClusterVersionUnavailable + } + resp.Version = cv.String() + + allowedTargetVersion := allowedDowngradeVersion(cv) + if !targetVersion.Equal(*allowedTargetVersion) { + return nil, ErrInvalidDowngradeTargetVersion + } + + downgradeInfo := s.cluster.DowngradeInfo() + if downgradeInfo.Enabled { + // Todo: return the downgrade status along with the error msg + return nil, ErrDowngradeInProcess + } + return resp, nil +} + +func (s *EtcdServer) downgradeEnable(ctx context.Context, r *pb.DowngradeRequest) (*pb.DowngradeResponse, error) { + // validate downgrade capability before starting downgrade + v := r.Version + lg := s.getLogger() + if resp, err := s.downgradeValidate(ctx, v); err != nil { + lg.Warn("reject downgrade request", zap.Error(err)) + return resp, err + } + targetVersion, err := convertToClusterVersion(v) + if err != nil { + lg.Warn("reject downgrade request", zap.Error(err)) + return nil, err + } + + raftRequest := membershippb.DowngradeInfoSetRequest{Enabled: true, Ver: targetVersion.String()} + _, err = s.raftRequest(ctx, pb.InternalRaftRequest{DowngradeInfoSet: &raftRequest}) + if err != nil { + lg.Warn("reject downgrade request", zap.Error(err)) + return nil, err + } + resp := pb.DowngradeResponse{Version: s.ClusterVersion().String()} + return &resp, nil +} + +func (s *EtcdServer) downgradeCancel(ctx context.Context) (*pb.DowngradeResponse, error) { + // gets leaders commit index and wait for local store to finish applying that index + // to avoid using stale downgrade information + if err := s.linearizableReadNotify(ctx); err != nil { + return nil, err + } + + downgradeInfo := s.cluster.DowngradeInfo() + if !downgradeInfo.Enabled { + return nil, ErrNoInflightDowngrade + } + + raftRequest := membershippb.DowngradeInfoSetRequest{Enabled: false} + _, err := s.raftRequest(ctx, pb.InternalRaftRequest{DowngradeInfoSet: &raftRequest}) + if err != nil { + return nil, err + } + resp := pb.DowngradeResponse{Version: s.ClusterVersion().String()} + return &resp, nil }