diff --git a/server/etcdserver/adapters.go b/server/etcdserver/adapters.go index 467992e1f..e5e943d15 100644 --- a/server/etcdserver/adapters.go +++ b/server/etcdserver/adapters.go @@ -18,12 +18,14 @@ import ( "context" "github.com/coreos/go-semver/semver" - "go.etcd.io/etcd/server/v3/storage/backend" - "go.etcd.io/etcd/server/v3/storage/schema" "go.uber.org/zap" + pb "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.etcd.io/etcd/api/v3/membershippb" "go.etcd.io/etcd/api/v3/version" serverversion "go.etcd.io/etcd/server/v3/etcdserver/version" + "go.etcd.io/etcd/server/v3/storage/backend" + "go.etcd.io/etcd/server/v3/storage/schema" ) // serverVersionAdapter implements Server interface needed by serverversion.Monitor @@ -46,12 +48,20 @@ func (s *serverVersionAdapter) UpdateClusterVersion(version string) { s.GoAttach(func() { s.updateClusterVersionV2(version) }) } -func (s *serverVersionAdapter) DowngradeCancel() { - ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout()) - if _, err := s.downgradeCancel(ctx); err != nil { - s.lg.Warn("failed to cancel downgrade", zap.Error(err)) - } - cancel() +func (s *serverVersionAdapter) LinearizableReadNotify(ctx context.Context) error { + return s.linearizableReadNotify(ctx) +} + +func (s *serverVersionAdapter) DowngradeEnable(ctx context.Context, targetVersion *semver.Version) error { + raftRequest := membershippb.DowngradeInfoSetRequest{Enabled: true, Ver: targetVersion.String()} + _, err := s.raftRequest(ctx, pb.InternalRaftRequest{DowngradeInfoSet: &raftRequest}) + return err +} + +func (s *serverVersionAdapter) DowngradeCancel(ctx context.Context) error { + raftRequest := membershippb.DowngradeInfoSetRequest{Enabled: false} + _, err := s.raftRequest(ctx, pb.InternalRaftRequest{DowngradeInfoSet: &raftRequest}) + return err } func (s *serverVersionAdapter) GetClusterVersion() *semver.Version { diff --git a/server/etcdserver/api/v3rpc/util.go b/server/etcdserver/api/v3rpc/util.go index f054f29c8..cef6476bc 100644 --- a/server/etcdserver/api/v3rpc/util.go +++ b/server/etcdserver/api/v3rpc/util.go @@ -23,6 +23,7 @@ import ( "go.etcd.io/etcd/server/v3/auth" "go.etcd.io/etcd/server/v3/etcdserver" "go.etcd.io/etcd/server/v3/etcdserver/api/membership" + "go.etcd.io/etcd/server/v3/etcdserver/version" "go.etcd.io/etcd/server/v3/lease" "go.etcd.io/etcd/server/v3/storage/mvcc" @@ -58,11 +59,11 @@ var toGRPCErrorMap = map[error]error{ etcdserver.ErrCorrupt: rpctypes.ErrGRPCCorrupt, etcdserver.ErrBadLeaderTransferee: rpctypes.ErrGRPCBadLeaderTransferee, - etcdserver.ErrClusterVersionUnavailable: rpctypes.ErrGRPCClusterVersionUnavailable, - etcdserver.ErrWrongDowngradeVersionFormat: rpctypes.ErrGRPCWrongDowngradeVersionFormat, - etcdserver.ErrInvalidDowngradeTargetVersion: rpctypes.ErrGRPCInvalidDowngradeTargetVersion, - etcdserver.ErrDowngradeInProcess: rpctypes.ErrGRPCDowngradeInProcess, - etcdserver.ErrNoInflightDowngrade: rpctypes.ErrGRPCNoInflightDowngrade, + etcdserver.ErrClusterVersionUnavailable: rpctypes.ErrGRPCClusterVersionUnavailable, + etcdserver.ErrWrongDowngradeVersionFormat: rpctypes.ErrGRPCWrongDowngradeVersionFormat, + version.ErrInvalidDowngradeTargetVersion: rpctypes.ErrGRPCInvalidDowngradeTargetVersion, + version.ErrDowngradeInProcess: rpctypes.ErrGRPCDowngradeInProcess, + version.ErrNoInflightDowngrade: rpctypes.ErrGRPCNoInflightDowngrade, lease.ErrLeaseNotFound: rpctypes.ErrGRPCLeaseNotFound, lease.ErrLeaseExists: rpctypes.ErrGRPCLeaseExist, diff --git a/server/etcdserver/errors.go b/server/etcdserver/errors.go index dc2a85fdd..9d9b07e13 100644 --- a/server/etcdserver/errors.go +++ b/server/etcdserver/errors.go @@ -20,30 +20,27 @@ import ( ) var ( - ErrUnknownMethod = errors.New("etcdserver: unknown method") - ErrStopped = errors.New("etcdserver: server stopped") - ErrCanceled = errors.New("etcdserver: request cancelled") - ErrTimeout = errors.New("etcdserver: request timed out") - ErrTimeoutDueToLeaderFail = errors.New("etcdserver: request timed out, possibly due to previous leader failure") - ErrTimeoutDueToConnectionLost = errors.New("etcdserver: request timed out, possibly due to connection lost") - ErrTimeoutLeaderTransfer = errors.New("etcdserver: request timed out, leader transfer took too long") - ErrLeaderChanged = errors.New("etcdserver: leader changed") - ErrNotEnoughStartedMembers = errors.New("etcdserver: re-configuration failed due to not enough started members") - ErrLearnerNotReady = errors.New("etcdserver: can only promote a learner member which is in sync with leader") - ErrNoLeader = errors.New("etcdserver: no leader") - ErrNotLeader = errors.New("etcdserver: not leader") - ErrRequestTooLarge = errors.New("etcdserver: request is too large") - ErrNoSpace = errors.New("etcdserver: no space") - ErrTooManyRequests = errors.New("etcdserver: too many requests") - ErrUnhealthy = errors.New("etcdserver: unhealthy cluster") - ErrKeyNotFound = errors.New("etcdserver: key not found") - ErrCorrupt = errors.New("etcdserver: corrupt cluster") - ErrBadLeaderTransferee = errors.New("etcdserver: bad leader transferee") - ErrClusterVersionUnavailable = errors.New("etcdserver: cluster version not found during downgrade") - ErrWrongDowngradeVersionFormat = errors.New("etcdserver: wrong downgrade target version format") - ErrInvalidDowngradeTargetVersion = errors.New("etcdserver: invalid downgrade target version") - ErrDowngradeInProcess = errors.New("etcdserver: cluster has a downgrade job in progress") - ErrNoInflightDowngrade = errors.New("etcdserver: no inflight downgrade job") + ErrUnknownMethod = errors.New("etcdserver: unknown method") + ErrStopped = errors.New("etcdserver: server stopped") + ErrCanceled = errors.New("etcdserver: request cancelled") + ErrTimeout = errors.New("etcdserver: request timed out") + ErrTimeoutDueToLeaderFail = errors.New("etcdserver: request timed out, possibly due to previous leader failure") + ErrTimeoutDueToConnectionLost = errors.New("etcdserver: request timed out, possibly due to connection lost") + ErrTimeoutLeaderTransfer = errors.New("etcdserver: request timed out, leader transfer took too long") + ErrLeaderChanged = errors.New("etcdserver: leader changed") + ErrNotEnoughStartedMembers = errors.New("etcdserver: re-configuration failed due to not enough started members") + ErrLearnerNotReady = errors.New("etcdserver: can only promote a learner member which is in sync with leader") + ErrNoLeader = errors.New("etcdserver: no leader") + ErrNotLeader = errors.New("etcdserver: not leader") + ErrRequestTooLarge = errors.New("etcdserver: request is too large") + ErrNoSpace = errors.New("etcdserver: no space") + ErrTooManyRequests = errors.New("etcdserver: too many requests") + ErrUnhealthy = errors.New("etcdserver: unhealthy cluster") + ErrKeyNotFound = errors.New("etcdserver: key not found") + ErrCorrupt = errors.New("etcdserver: corrupt cluster") + ErrBadLeaderTransferee = errors.New("etcdserver: bad leader transferee") + ErrClusterVersionUnavailable = errors.New("etcdserver: cluster version not found during downgrade") + ErrWrongDowngradeVersionFormat = errors.New("etcdserver: wrong downgrade target version format") ) type DiscoveryError struct { diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index bec28e45e..2de2477ec 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -2359,3 +2359,7 @@ func (s *EtcdServer) IsMemberExist(id types.ID) bool { func (s *EtcdServer) raftStatus() raft.Status { return s.r.Node.Status() } + +func (s *EtcdServer) Version() *serverversion.Manager { + return serverversion.NewManager(s.Logger(), newServerVersionAdapter(s)) +} diff --git a/server/etcdserver/v3_server.go b/server/etcdserver/v3_server.go index 8e745b880..9885fc01c 100644 --- a/server/etcdserver/v3_server.go +++ b/server/etcdserver/v3_server.go @@ -23,12 +23,10 @@ import ( "time" pb "go.etcd.io/etcd/api/v3/etcdserverpb" - "go.etcd.io/etcd/api/v3/membershippb" "go.etcd.io/etcd/pkg/v3/traceutil" "go.etcd.io/etcd/raft/v3" "go.etcd.io/etcd/server/v3/auth" "go.etcd.io/etcd/server/v3/etcdserver/api/membership" - "go.etcd.io/etcd/server/v3/etcdserver/version" "go.etcd.io/etcd/server/v3/lease" "go.etcd.io/etcd/server/v3/lease/leasehttp" "go.etcd.io/etcd/server/v3/storage/mvcc" @@ -920,48 +918,27 @@ func (s *EtcdServer) downgradeValidate(ctx context.Context, v string) (*pb.Downg return nil, err } - // gets leaders commit index and wait for local store to finish applying that index - // to avoid using stale downgrade information - err = s.linearizableReadNotify(ctx) - if err != nil { - return nil, err - } - cv := s.ClusterVersion() if cv == nil { return nil, ErrClusterVersionUnavailable } resp.Version = cv.String() - - allowedTargetVersion := version.AllowedDowngradeVersion(cv) - if !targetVersion.Equal(*allowedTargetVersion) { - return nil, ErrInvalidDowngradeTargetVersion + err = s.Version().DowngradeValidate(ctx, targetVersion) + if err != nil { + return nil, err } - downgradeInfo := s.cluster.DowngradeInfo() - if downgradeInfo.Enabled { - // Todo: return the downgrade status along with the error msg - return nil, ErrDowngradeInProcess - } return resp, nil } func (s *EtcdServer) downgradeEnable(ctx context.Context, r *pb.DowngradeRequest) (*pb.DowngradeResponse, error) { - // validate downgrade capability before starting downgrade - v := r.Version lg := s.Logger() - if resp, err := s.downgradeValidate(ctx, v); err != nil { - lg.Warn("reject downgrade request", zap.Error(err)) - return resp, err - } - targetVersion, err := convertToClusterVersion(v) + targetVersion, err := convertToClusterVersion(r.Version) if err != nil { lg.Warn("reject downgrade request", zap.Error(err)) return nil, err } - - raftRequest := membershippb.DowngradeInfoSetRequest{Enabled: true, Ver: targetVersion.String()} - _, err = s.raftRequest(ctx, pb.InternalRaftRequest{DowngradeInfoSet: &raftRequest}) + err = s.Version().DowngradeEnable(ctx, targetVersion) if err != nil { lg.Warn("reject downgrade request", zap.Error(err)) return nil, err @@ -971,21 +948,9 @@ func (s *EtcdServer) downgradeEnable(ctx context.Context, r *pb.DowngradeRequest } func (s *EtcdServer) downgradeCancel(ctx context.Context) (*pb.DowngradeResponse, error) { - // gets leaders commit index and wait for local store to finish applying that index - // to avoid using stale downgrade information - if err := s.linearizableReadNotify(ctx); err != nil { - return nil, err - } - - downgradeInfo := s.cluster.DowngradeInfo() - if !downgradeInfo.Enabled { - return nil, ErrNoInflightDowngrade - } - - raftRequest := membershippb.DowngradeInfoSetRequest{Enabled: false} - _, err := s.raftRequest(ctx, pb.InternalRaftRequest{DowngradeInfoSet: &raftRequest}) + err := s.Version().DowngradeCancel(ctx) if err != nil { - return nil, err + s.lg.Warn("failed to cancel downgrade", zap.Error(err)) } resp := pb.DowngradeResponse{Version: s.ClusterVersion().String()} return &resp, nil diff --git a/server/etcdserver/version/errors.go b/server/etcdserver/version/errors.go new file mode 100644 index 000000000..906aa9f41 --- /dev/null +++ b/server/etcdserver/version/errors.go @@ -0,0 +1,23 @@ +// Copyright 2021 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package version + +import "errors" + +var ( + ErrInvalidDowngradeTargetVersion = errors.New("etcdserver: invalid downgrade target version") + ErrDowngradeInProcess = errors.New("etcdserver: cluster has a downgrade job in progress") + ErrNoInflightDowngrade = errors.New("etcdserver: no inflight downgrade job") +) diff --git a/server/etcdserver/version/monitor.go b/server/etcdserver/version/monitor.go index 43c78b47a..1a8e73e47 100644 --- a/server/etcdserver/version/monitor.go +++ b/server/etcdserver/version/monitor.go @@ -15,6 +15,8 @@ package version import ( + "context" + "github.com/coreos/go-semver/semver" "go.etcd.io/etcd/api/v3/version" "go.uber.org/zap" @@ -32,7 +34,9 @@ type Server interface { GetDowngradeInfo() *DowngradeInfo GetMembersVersions() map[string]*version.Versions UpdateClusterVersion(string) - DowngradeCancel() + LinearizableReadNotify(ctx context.Context) error + DowngradeEnable(ctx context.Context, targetVersion *semver.Version) error + DowngradeCancel(ctx context.Context) error GetStorageVersion() *semver.Version UpdateStorageVersion(semver.Version) @@ -101,7 +105,10 @@ func (m *Monitor) CancelDowngradeIfNeeded() { v := semver.Must(semver.NewVersion(targetVersion)) if m.versionsMatchTarget(v) { m.lg.Info("the cluster has been downgraded", zap.String("cluster-version", targetVersion)) - m.s.DowngradeCancel() + err := m.s.DowngradeCancel(context.Background()) + if err != nil { + m.lg.Warn("failed to cancel downgrade", zap.Error(err)) + } } } diff --git a/server/etcdserver/version/monitor_test.go b/server/etcdserver/version/monitor_test.go index 6bc5e836a..ffc908e7b 100644 --- a/server/etcdserver/version/monitor_test.go +++ b/server/etcdserver/version/monitor_test.go @@ -1,6 +1,7 @@ package version import ( + "context" "reflect" "testing" @@ -339,8 +340,17 @@ func (s *storageMock) UpdateClusterVersion(version string) { s.clusterVersion = semver.New(version) } -func (s *storageMock) DowngradeCancel() { +func (s *storageMock) LinearizableReadNotify(ctx context.Context) error { + return nil +} + +func (s *storageMock) DowngradeEnable(ctx context.Context, targetVersion *semver.Version) error { + return nil +} + +func (s *storageMock) DowngradeCancel(ctx context.Context) error { s.downgradeInfo = nil + return nil } func (s *storageMock) GetClusterVersion() *semver.Version { diff --git a/server/etcdserver/version/version.go b/server/etcdserver/version/version.go new file mode 100644 index 000000000..b2c62b15c --- /dev/null +++ b/server/etcdserver/version/version.go @@ -0,0 +1,81 @@ +// Copyright 2021 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package version + +import ( + "context" + + "github.com/coreos/go-semver/semver" + "go.uber.org/zap" +) + +// Manager contains logic to manage etcd cluster version downgrade process. +type Manager struct { + lg *zap.Logger + s Server +} + +// NewManager returns a new manager instance +func NewManager(lg *zap.Logger, s Server) *Manager { + return &Manager{ + lg: lg, + s: s, + } +} + +// DowngradeValidate validates if cluster is downloadable to provided target version and returns error if not. +func (m *Manager) DowngradeValidate(ctx context.Context, targetVersion *semver.Version) error { + // gets leaders commit index and wait for local store to finish applying that index + // to avoid using stale downgrade information + err := m.s.LinearizableReadNotify(ctx) + if err != nil { + return err + } + cv := m.s.GetClusterVersion() + allowedTargetVersion := AllowedDowngradeVersion(cv) + if !targetVersion.Equal(*allowedTargetVersion) { + return ErrInvalidDowngradeTargetVersion + } + + downgradeInfo := m.s.GetDowngradeInfo() + if downgradeInfo != nil && downgradeInfo.Enabled { + // Todo: return the downgrade status along with the error msg + return ErrDowngradeInProcess + } + return nil +} + +// DowngradeEnable initiates etcd cluster version downgrade process. +func (m *Manager) DowngradeEnable(ctx context.Context, targetVersion *semver.Version) error { + // validate downgrade capability before starting downgrade + err := m.DowngradeValidate(ctx, targetVersion) + if err != nil { + return err + } + return m.s.DowngradeEnable(context.Background(), targetVersion) +} + +// DowngradeCancel cancels ongoing downgrade process. +func (m *Manager) DowngradeCancel(ctx context.Context) error { + err := m.s.LinearizableReadNotify(ctx) + if err != nil { + return err + } + downgradeInfo := m.s.GetDowngradeInfo() + if !downgradeInfo.Enabled { + return ErrNoInflightDowngrade + } + return m.s.DowngradeCancel(ctx) +} diff --git a/server/etcdserver/version/version_test.go b/server/etcdserver/version/version_test.go index 91a186729..5dd01d01e 100644 --- a/server/etcdserver/version/version_test.go +++ b/server/etcdserver/version/version_test.go @@ -15,6 +15,7 @@ package version import ( + "context" "fmt" "math/rand" "testing" @@ -73,7 +74,7 @@ func newCluster(lg *zap.Logger, memberCount int, ver semver.Version) *clusterMoc serverVersion: ver, storageVersion: majorMinVer, } - m.monitor = NewMonitor(lg.Named(fmt.Sprintf("m%d", i)), m) + m.monitor = NewMonitor(lg.Named(fmt.Sprintf("m%d", i)), m) cluster.members = append(cluster.members, m) } cluster.members[0].isLeader = true @@ -140,8 +141,18 @@ func (m *memberMock) UpdateClusterVersion(version string) { m.cluster.clusterVersion = *semver.New(version) } -func (m *memberMock) DowngradeCancel() { +func (m *memberMock) LinearizableReadNotify(ctx context.Context) error { + return nil +} + +func (m *memberMock) DowngradeEnable(ctx context.Context, targetVersion *semver.Version) error { m.cluster.downgradeInfo = nil + return nil +} + +func (m *memberMock) DowngradeCancel(context.Context) error { + m.cluster.downgradeInfo = nil + return nil } func (m *memberMock) GetClusterVersion() *semver.Version {