server: Move downgrade API logic into version package

This commit is contained in:
Marek Siarkowicz 2021-10-05 16:54:52 +02:00
parent 1e5e57f268
commit e47c3c22d2
10 changed files with 193 additions and 84 deletions

View File

@ -18,12 +18,14 @@ import (
"context"
"github.com/coreos/go-semver/semver"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.uber.org/zap"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/membershippb"
"go.etcd.io/etcd/api/v3/version"
serverversion "go.etcd.io/etcd/server/v3/etcdserver/version"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/schema"
)
// serverVersionAdapter implements Server interface needed by serverversion.Monitor
@ -46,12 +48,20 @@ func (s *serverVersionAdapter) UpdateClusterVersion(version string) {
s.GoAttach(func() { s.updateClusterVersionV2(version) })
}
func (s *serverVersionAdapter) DowngradeCancel() {
ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
if _, err := s.downgradeCancel(ctx); err != nil {
s.lg.Warn("failed to cancel downgrade", zap.Error(err))
}
cancel()
func (s *serverVersionAdapter) LinearizableReadNotify(ctx context.Context) error {
return s.linearizableReadNotify(ctx)
}
func (s *serverVersionAdapter) DowngradeEnable(ctx context.Context, targetVersion *semver.Version) error {
raftRequest := membershippb.DowngradeInfoSetRequest{Enabled: true, Ver: targetVersion.String()}
_, err := s.raftRequest(ctx, pb.InternalRaftRequest{DowngradeInfoSet: &raftRequest})
return err
}
func (s *serverVersionAdapter) DowngradeCancel(ctx context.Context) error {
raftRequest := membershippb.DowngradeInfoSetRequest{Enabled: false}
_, err := s.raftRequest(ctx, pb.InternalRaftRequest{DowngradeInfoSet: &raftRequest})
return err
}
func (s *serverVersionAdapter) GetClusterVersion() *semver.Version {

View File

@ -23,6 +23,7 @@ import (
"go.etcd.io/etcd/server/v3/auth"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/version"
"go.etcd.io/etcd/server/v3/lease"
"go.etcd.io/etcd/server/v3/storage/mvcc"
@ -58,11 +59,11 @@ var toGRPCErrorMap = map[error]error{
etcdserver.ErrCorrupt: rpctypes.ErrGRPCCorrupt,
etcdserver.ErrBadLeaderTransferee: rpctypes.ErrGRPCBadLeaderTransferee,
etcdserver.ErrClusterVersionUnavailable: rpctypes.ErrGRPCClusterVersionUnavailable,
etcdserver.ErrWrongDowngradeVersionFormat: rpctypes.ErrGRPCWrongDowngradeVersionFormat,
etcdserver.ErrInvalidDowngradeTargetVersion: rpctypes.ErrGRPCInvalidDowngradeTargetVersion,
etcdserver.ErrDowngradeInProcess: rpctypes.ErrGRPCDowngradeInProcess,
etcdserver.ErrNoInflightDowngrade: rpctypes.ErrGRPCNoInflightDowngrade,
etcdserver.ErrClusterVersionUnavailable: rpctypes.ErrGRPCClusterVersionUnavailable,
etcdserver.ErrWrongDowngradeVersionFormat: rpctypes.ErrGRPCWrongDowngradeVersionFormat,
version.ErrInvalidDowngradeTargetVersion: rpctypes.ErrGRPCInvalidDowngradeTargetVersion,
version.ErrDowngradeInProcess: rpctypes.ErrGRPCDowngradeInProcess,
version.ErrNoInflightDowngrade: rpctypes.ErrGRPCNoInflightDowngrade,
lease.ErrLeaseNotFound: rpctypes.ErrGRPCLeaseNotFound,
lease.ErrLeaseExists: rpctypes.ErrGRPCLeaseExist,

View File

@ -20,30 +20,27 @@ import (
)
var (
ErrUnknownMethod = errors.New("etcdserver: unknown method")
ErrStopped = errors.New("etcdserver: server stopped")
ErrCanceled = errors.New("etcdserver: request cancelled")
ErrTimeout = errors.New("etcdserver: request timed out")
ErrTimeoutDueToLeaderFail = errors.New("etcdserver: request timed out, possibly due to previous leader failure")
ErrTimeoutDueToConnectionLost = errors.New("etcdserver: request timed out, possibly due to connection lost")
ErrTimeoutLeaderTransfer = errors.New("etcdserver: request timed out, leader transfer took too long")
ErrLeaderChanged = errors.New("etcdserver: leader changed")
ErrNotEnoughStartedMembers = errors.New("etcdserver: re-configuration failed due to not enough started members")
ErrLearnerNotReady = errors.New("etcdserver: can only promote a learner member which is in sync with leader")
ErrNoLeader = errors.New("etcdserver: no leader")
ErrNotLeader = errors.New("etcdserver: not leader")
ErrRequestTooLarge = errors.New("etcdserver: request is too large")
ErrNoSpace = errors.New("etcdserver: no space")
ErrTooManyRequests = errors.New("etcdserver: too many requests")
ErrUnhealthy = errors.New("etcdserver: unhealthy cluster")
ErrKeyNotFound = errors.New("etcdserver: key not found")
ErrCorrupt = errors.New("etcdserver: corrupt cluster")
ErrBadLeaderTransferee = errors.New("etcdserver: bad leader transferee")
ErrClusterVersionUnavailable = errors.New("etcdserver: cluster version not found during downgrade")
ErrWrongDowngradeVersionFormat = errors.New("etcdserver: wrong downgrade target version format")
ErrInvalidDowngradeTargetVersion = errors.New("etcdserver: invalid downgrade target version")
ErrDowngradeInProcess = errors.New("etcdserver: cluster has a downgrade job in progress")
ErrNoInflightDowngrade = errors.New("etcdserver: no inflight downgrade job")
ErrUnknownMethod = errors.New("etcdserver: unknown method")
ErrStopped = errors.New("etcdserver: server stopped")
ErrCanceled = errors.New("etcdserver: request cancelled")
ErrTimeout = errors.New("etcdserver: request timed out")
ErrTimeoutDueToLeaderFail = errors.New("etcdserver: request timed out, possibly due to previous leader failure")
ErrTimeoutDueToConnectionLost = errors.New("etcdserver: request timed out, possibly due to connection lost")
ErrTimeoutLeaderTransfer = errors.New("etcdserver: request timed out, leader transfer took too long")
ErrLeaderChanged = errors.New("etcdserver: leader changed")
ErrNotEnoughStartedMembers = errors.New("etcdserver: re-configuration failed due to not enough started members")
ErrLearnerNotReady = errors.New("etcdserver: can only promote a learner member which is in sync with leader")
ErrNoLeader = errors.New("etcdserver: no leader")
ErrNotLeader = errors.New("etcdserver: not leader")
ErrRequestTooLarge = errors.New("etcdserver: request is too large")
ErrNoSpace = errors.New("etcdserver: no space")
ErrTooManyRequests = errors.New("etcdserver: too many requests")
ErrUnhealthy = errors.New("etcdserver: unhealthy cluster")
ErrKeyNotFound = errors.New("etcdserver: key not found")
ErrCorrupt = errors.New("etcdserver: corrupt cluster")
ErrBadLeaderTransferee = errors.New("etcdserver: bad leader transferee")
ErrClusterVersionUnavailable = errors.New("etcdserver: cluster version not found during downgrade")
ErrWrongDowngradeVersionFormat = errors.New("etcdserver: wrong downgrade target version format")
)
type DiscoveryError struct {

View File

@ -2359,3 +2359,7 @@ func (s *EtcdServer) IsMemberExist(id types.ID) bool {
func (s *EtcdServer) raftStatus() raft.Status {
return s.r.Node.Status()
}
func (s *EtcdServer) Version() *serverversion.Manager {
return serverversion.NewManager(s.Logger(), newServerVersionAdapter(s))
}

View File

@ -23,12 +23,10 @@ import (
"time"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/membershippb"
"go.etcd.io/etcd/pkg/v3/traceutil"
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/server/v3/auth"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/version"
"go.etcd.io/etcd/server/v3/lease"
"go.etcd.io/etcd/server/v3/lease/leasehttp"
"go.etcd.io/etcd/server/v3/storage/mvcc"
@ -920,48 +918,27 @@ func (s *EtcdServer) downgradeValidate(ctx context.Context, v string) (*pb.Downg
return nil, err
}
// gets leaders commit index and wait for local store to finish applying that index
// to avoid using stale downgrade information
err = s.linearizableReadNotify(ctx)
if err != nil {
return nil, err
}
cv := s.ClusterVersion()
if cv == nil {
return nil, ErrClusterVersionUnavailable
}
resp.Version = cv.String()
allowedTargetVersion := version.AllowedDowngradeVersion(cv)
if !targetVersion.Equal(*allowedTargetVersion) {
return nil, ErrInvalidDowngradeTargetVersion
err = s.Version().DowngradeValidate(ctx, targetVersion)
if err != nil {
return nil, err
}
downgradeInfo := s.cluster.DowngradeInfo()
if downgradeInfo.Enabled {
// Todo: return the downgrade status along with the error msg
return nil, ErrDowngradeInProcess
}
return resp, nil
}
func (s *EtcdServer) downgradeEnable(ctx context.Context, r *pb.DowngradeRequest) (*pb.DowngradeResponse, error) {
// validate downgrade capability before starting downgrade
v := r.Version
lg := s.Logger()
if resp, err := s.downgradeValidate(ctx, v); err != nil {
lg.Warn("reject downgrade request", zap.Error(err))
return resp, err
}
targetVersion, err := convertToClusterVersion(v)
targetVersion, err := convertToClusterVersion(r.Version)
if err != nil {
lg.Warn("reject downgrade request", zap.Error(err))
return nil, err
}
raftRequest := membershippb.DowngradeInfoSetRequest{Enabled: true, Ver: targetVersion.String()}
_, err = s.raftRequest(ctx, pb.InternalRaftRequest{DowngradeInfoSet: &raftRequest})
err = s.Version().DowngradeEnable(ctx, targetVersion)
if err != nil {
lg.Warn("reject downgrade request", zap.Error(err))
return nil, err
@ -971,21 +948,9 @@ func (s *EtcdServer) downgradeEnable(ctx context.Context, r *pb.DowngradeRequest
}
func (s *EtcdServer) downgradeCancel(ctx context.Context) (*pb.DowngradeResponse, error) {
// gets leaders commit index and wait for local store to finish applying that index
// to avoid using stale downgrade information
if err := s.linearizableReadNotify(ctx); err != nil {
return nil, err
}
downgradeInfo := s.cluster.DowngradeInfo()
if !downgradeInfo.Enabled {
return nil, ErrNoInflightDowngrade
}
raftRequest := membershippb.DowngradeInfoSetRequest{Enabled: false}
_, err := s.raftRequest(ctx, pb.InternalRaftRequest{DowngradeInfoSet: &raftRequest})
err := s.Version().DowngradeCancel(ctx)
if err != nil {
return nil, err
s.lg.Warn("failed to cancel downgrade", zap.Error(err))
}
resp := pb.DowngradeResponse{Version: s.ClusterVersion().String()}
return &resp, nil

View File

@ -0,0 +1,23 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package version
import "errors"
var (
ErrInvalidDowngradeTargetVersion = errors.New("etcdserver: invalid downgrade target version")
ErrDowngradeInProcess = errors.New("etcdserver: cluster has a downgrade job in progress")
ErrNoInflightDowngrade = errors.New("etcdserver: no inflight downgrade job")
)

View File

@ -15,6 +15,8 @@
package version
import (
"context"
"github.com/coreos/go-semver/semver"
"go.etcd.io/etcd/api/v3/version"
"go.uber.org/zap"
@ -32,7 +34,9 @@ type Server interface {
GetDowngradeInfo() *DowngradeInfo
GetMembersVersions() map[string]*version.Versions
UpdateClusterVersion(string)
DowngradeCancel()
LinearizableReadNotify(ctx context.Context) error
DowngradeEnable(ctx context.Context, targetVersion *semver.Version) error
DowngradeCancel(ctx context.Context) error
GetStorageVersion() *semver.Version
UpdateStorageVersion(semver.Version)
@ -101,7 +105,10 @@ func (m *Monitor) CancelDowngradeIfNeeded() {
v := semver.Must(semver.NewVersion(targetVersion))
if m.versionsMatchTarget(v) {
m.lg.Info("the cluster has been downgraded", zap.String("cluster-version", targetVersion))
m.s.DowngradeCancel()
err := m.s.DowngradeCancel(context.Background())
if err != nil {
m.lg.Warn("failed to cancel downgrade", zap.Error(err))
}
}
}

View File

@ -1,6 +1,7 @@
package version
import (
"context"
"reflect"
"testing"
@ -339,8 +340,17 @@ func (s *storageMock) UpdateClusterVersion(version string) {
s.clusterVersion = semver.New(version)
}
func (s *storageMock) DowngradeCancel() {
func (s *storageMock) LinearizableReadNotify(ctx context.Context) error {
return nil
}
func (s *storageMock) DowngradeEnable(ctx context.Context, targetVersion *semver.Version) error {
return nil
}
func (s *storageMock) DowngradeCancel(ctx context.Context) error {
s.downgradeInfo = nil
return nil
}
func (s *storageMock) GetClusterVersion() *semver.Version {

View File

@ -0,0 +1,81 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package version
import (
"context"
"github.com/coreos/go-semver/semver"
"go.uber.org/zap"
)
// Manager contains logic to manage etcd cluster version downgrade process.
type Manager struct {
lg *zap.Logger
s Server
}
// NewManager returns a new manager instance
func NewManager(lg *zap.Logger, s Server) *Manager {
return &Manager{
lg: lg,
s: s,
}
}
// DowngradeValidate validates if cluster is downloadable to provided target version and returns error if not.
func (m *Manager) DowngradeValidate(ctx context.Context, targetVersion *semver.Version) error {
// gets leaders commit index and wait for local store to finish applying that index
// to avoid using stale downgrade information
err := m.s.LinearizableReadNotify(ctx)
if err != nil {
return err
}
cv := m.s.GetClusterVersion()
allowedTargetVersion := AllowedDowngradeVersion(cv)
if !targetVersion.Equal(*allowedTargetVersion) {
return ErrInvalidDowngradeTargetVersion
}
downgradeInfo := m.s.GetDowngradeInfo()
if downgradeInfo != nil && downgradeInfo.Enabled {
// Todo: return the downgrade status along with the error msg
return ErrDowngradeInProcess
}
return nil
}
// DowngradeEnable initiates etcd cluster version downgrade process.
func (m *Manager) DowngradeEnable(ctx context.Context, targetVersion *semver.Version) error {
// validate downgrade capability before starting downgrade
err := m.DowngradeValidate(ctx, targetVersion)
if err != nil {
return err
}
return m.s.DowngradeEnable(context.Background(), targetVersion)
}
// DowngradeCancel cancels ongoing downgrade process.
func (m *Manager) DowngradeCancel(ctx context.Context) error {
err := m.s.LinearizableReadNotify(ctx)
if err != nil {
return err
}
downgradeInfo := m.s.GetDowngradeInfo()
if !downgradeInfo.Enabled {
return ErrNoInflightDowngrade
}
return m.s.DowngradeCancel(ctx)
}

View File

@ -15,6 +15,7 @@
package version
import (
"context"
"fmt"
"math/rand"
"testing"
@ -73,7 +74,7 @@ func newCluster(lg *zap.Logger, memberCount int, ver semver.Version) *clusterMoc
serverVersion: ver,
storageVersion: majorMinVer,
}
m.monitor = NewMonitor(lg.Named(fmt.Sprintf("m%d", i)), m)
m.monitor = NewMonitor(lg.Named(fmt.Sprintf("m%d", i)), m)
cluster.members = append(cluster.members, m)
}
cluster.members[0].isLeader = true
@ -140,8 +141,18 @@ func (m *memberMock) UpdateClusterVersion(version string) {
m.cluster.clusterVersion = *semver.New(version)
}
func (m *memberMock) DowngradeCancel() {
func (m *memberMock) LinearizableReadNotify(ctx context.Context) error {
return nil
}
func (m *memberMock) DowngradeEnable(ctx context.Context, targetVersion *semver.Version) error {
m.cluster.downgradeInfo = nil
return nil
}
func (m *memberMock) DowngradeCancel(context.Context) error {
m.cluster.downgradeInfo = nil
return nil
}
func (m *memberMock) GetClusterVersion() *semver.Version {