Merge pull request #13391 from serathius/downgrade-refactor

Refactor code to make place for downgrade logic
This commit is contained in:
Piotr Tabor 2021-10-08 12:38:25 +02:00 committed by GitHub
commit 5b226e0abf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 779 additions and 342 deletions

View File

@ -18,13 +18,14 @@ import (
"context"
"github.com/coreos/go-semver/semver"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.uber.org/zap"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/membershippb"
"go.etcd.io/etcd/api/v3/version"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
serverversion "go.etcd.io/etcd/server/v3/etcdserver/version"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/schema"
)
// serverVersionAdapter implements Server interface needed by serverversion.Monitor
@ -47,24 +48,32 @@ func (s *serverVersionAdapter) UpdateClusterVersion(version string) {
s.GoAttach(func() { s.updateClusterVersionV2(version) })
}
func (s *serverVersionAdapter) DowngradeCancel() {
ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
if _, err := s.downgradeCancel(ctx); err != nil {
s.lg.Warn("failed to cancel downgrade", zap.Error(err))
}
cancel()
func (s *serverVersionAdapter) LinearizableReadNotify(ctx context.Context) error {
return s.linearizableReadNotify(ctx)
}
func (s *serverVersionAdapter) DowngradeEnable(ctx context.Context, targetVersion *semver.Version) error {
raftRequest := membershippb.DowngradeInfoSetRequest{Enabled: true, Ver: targetVersion.String()}
_, err := s.raftRequest(ctx, pb.InternalRaftRequest{DowngradeInfoSet: &raftRequest})
return err
}
func (s *serverVersionAdapter) DowngradeCancel(ctx context.Context) error {
raftRequest := membershippb.DowngradeInfoSetRequest{Enabled: false}
_, err := s.raftRequest(ctx, pb.InternalRaftRequest{DowngradeInfoSet: &raftRequest})
return err
}
func (s *serverVersionAdapter) GetClusterVersion() *semver.Version {
return s.cluster.Version()
}
func (s *serverVersionAdapter) GetDowngradeInfo() *membership.DowngradeInfo {
func (s *serverVersionAdapter) GetDowngradeInfo() *serverversion.DowngradeInfo {
return s.cluster.DowngradeInfo()
}
func (s *serverVersionAdapter) GetVersions() map[string]*version.Versions {
return getVersions(s.lg, s.cluster, s.id, s.peerRt)
func (s *serverVersionAdapter) GetMembersVersions() map[string]*version.Versions {
return getMembersVersions(s.lg, s.cluster, s.id, s.peerRt)
}
func (s *serverVersionAdapter) GetStorageVersion() *semver.Version {

View File

@ -18,7 +18,7 @@ import (
"sync"
"go.etcd.io/etcd/api/v3/version"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
serverversion "go.etcd.io/etcd/server/v3/etcdserver/version"
"go.uber.org/zap"
"github.com/coreos/go-semver/semver"
@ -64,7 +64,7 @@ func UpdateCapability(lg *zap.Logger, v *semver.Version) {
return
}
enableMapMu.Lock()
if curVersion != nil && !membership.IsValidVersionChange(v, curVersion) {
if curVersion != nil && !serverversion.IsValidVersionChange(v, curVersion) {
enableMapMu.Unlock()
return
}

View File

@ -33,6 +33,7 @@ import (
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
serverversion "go.etcd.io/etcd/server/v3/etcdserver/version"
"github.com/coreos/go-semver/semver"
"github.com/prometheus/client_golang/prometheus"
@ -58,7 +59,7 @@ type RaftCluster struct {
// removed id cannot be reused.
removed map[types.ID]bool
downgradeInfo *DowngradeInfo
downgradeInfo *serverversion.DowngradeInfo
versionChanged *notify.Notifier
}
@ -113,7 +114,7 @@ func NewCluster(lg *zap.Logger) *RaftCluster {
lg: lg,
members: make(map[types.ID]*Member),
removed: make(map[types.ID]bool),
downgradeInfo: &DowngradeInfo{Enabled: false},
downgradeInfo: &serverversion.DowngradeInfo{Enabled: false},
}
}
@ -268,11 +269,12 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) {
if c.be != nil {
c.downgradeInfo = c.be.DowngradeInfoFromBackend()
}
d := &DowngradeInfo{Enabled: false}
d := &serverversion.DowngradeInfo{Enabled: false}
if c.downgradeInfo != nil {
d = &DowngradeInfo{Enabled: c.downgradeInfo.Enabled, TargetVersion: c.downgradeInfo.TargetVersion}
d = &serverversion.DowngradeInfo{Enabled: c.downgradeInfo.Enabled, TargetVersion: c.downgradeInfo.TargetVersion}
}
mustDetectDowngrade(c.lg, c.version, d)
sv := semver.Must(semver.NewVersion(version.Version))
serverversion.MustDetectDowngrade(c.lg, sv, c.version, d)
onSet(c.lg, c.version)
for _, m := range c.members {
@ -540,7 +542,8 @@ func (c *RaftCluster) SetVersion(ver *semver.Version, onSet func(*zap.Logger, *s
}
oldVer := c.version
c.version = ver
mustDetectDowngrade(c.lg, c.version, c.downgradeInfo)
sv := semver.Must(semver.NewVersion(version.Version))
serverversion.MustDetectDowngrade(c.lg, sv, c.version, c.downgradeInfo)
if c.v2store != nil {
mustSaveClusterVersionToStore(c.lg, c.v2store, ver)
}
@ -715,22 +718,6 @@ func ValidateClusterAndAssignIDs(lg *zap.Logger, local *RaftCluster, existing *R
return nil
}
// IsValidVersionChange checks the two scenario when version is valid to change:
// 1. Downgrade: cluster version is 1 minor version higher than local version,
// cluster version should change.
// 2. Cluster start: when not all members version are available, cluster version
// is set to MinVersion(3.0), when all members are at higher version, cluster version
// is lower than local version, cluster version should change
func IsValidVersionChange(cv *semver.Version, lv *semver.Version) bool {
cv = &semver.Version{Major: cv.Major, Minor: cv.Minor}
lv = &semver.Version{Major: lv.Major, Minor: lv.Minor}
if isValidDowngrade(cv, lv) || (cv.Major == lv.Major && cv.LessThan(*lv)) {
return true
}
return false
}
// IsLocalMemberLearner returns if the local member is raft learner
func (c *RaftCluster) IsLocalMemberLearner() bool {
c.Lock()
@ -747,17 +734,17 @@ func (c *RaftCluster) IsLocalMemberLearner() bool {
}
// DowngradeInfo returns the downgrade status of the cluster
func (c *RaftCluster) DowngradeInfo() *DowngradeInfo {
func (c *RaftCluster) DowngradeInfo() *serverversion.DowngradeInfo {
c.Lock()
defer c.Unlock()
if c.downgradeInfo == nil {
return &DowngradeInfo{Enabled: false}
return &serverversion.DowngradeInfo{Enabled: false}
}
d := &DowngradeInfo{Enabled: c.downgradeInfo.Enabled, TargetVersion: c.downgradeInfo.TargetVersion}
d := &serverversion.DowngradeInfo{Enabled: c.downgradeInfo.Enabled, TargetVersion: c.downgradeInfo.TargetVersion}
return d
}
func (c *RaftCluster) SetDowngradeInfo(d *DowngradeInfo, shouldApplyV3 ShouldApplyV3) {
func (c *RaftCluster) SetDowngradeInfo(d *serverversion.DowngradeInfo, shouldApplyV3 ShouldApplyV3) {
c.Lock()
defer c.Unlock()

View File

@ -21,7 +21,6 @@ import (
"reflect"
"testing"
"github.com/coreos/go-semver/semver"
"go.uber.org/zap/zaptest"
"go.etcd.io/etcd/client/pkg/v3/testutil"
@ -947,75 +946,3 @@ func TestIsReadyToPromoteMember(t *testing.T) {
}
}
}
func TestIsVersionChangable(t *testing.T) {
v0 := semver.Must(semver.NewVersion("2.4.0"))
v1 := semver.Must(semver.NewVersion("3.4.0"))
v2 := semver.Must(semver.NewVersion("3.5.0"))
v3 := semver.Must(semver.NewVersion("3.5.1"))
v4 := semver.Must(semver.NewVersion("3.6.0"))
tests := []struct {
name string
currentVersion *semver.Version
localVersion *semver.Version
expectedResult bool
}{
{
name: "When local version is one minor lower than cluster version",
currentVersion: v2,
localVersion: v1,
expectedResult: true,
},
{
name: "When local version is one minor and one patch lower than cluster version",
currentVersion: v3,
localVersion: v1,
expectedResult: true,
},
{
name: "When local version is one minor higher than cluster version",
currentVersion: v1,
localVersion: v2,
expectedResult: true,
},
{
name: "When local version is two minor higher than cluster version",
currentVersion: v1,
localVersion: v4,
expectedResult: true,
},
{
name: "When local version is one major higher than cluster version",
currentVersion: v0,
localVersion: v1,
expectedResult: false,
},
{
name: "When local version is equal to cluster version",
currentVersion: v1,
localVersion: v1,
expectedResult: false,
},
{
name: "When local version is one patch higher than cluster version",
currentVersion: v2,
localVersion: v3,
expectedResult: false,
},
{
name: "When local version is two minor lower than cluster version",
currentVersion: v4,
localVersion: v1,
expectedResult: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if ret := IsValidVersionChange(tt.currentVersion, tt.localVersion); ret != tt.expectedResult {
t.Errorf("Expected %v; Got %v", tt.expectedResult, ret)
}
})
}
}

View File

@ -6,6 +6,7 @@ import (
"github.com/coreos/go-semver/semver"
"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/server/v3/etcdserver/version"
"go.uber.org/zap"
)
@ -50,5 +51,5 @@ func (b *backendMock) MustSaveMemberToBackend(*Member) {}
func (b *backendMock) TrimMembershipFromBackend() error { return nil }
func (b *backendMock) MustDeleteMemberFromBackend(types.ID) {}
func (b *backendMock) MustSaveDowngradeToBackend(*DowngradeInfo) {}
func (b *backendMock) DowngradeInfoFromBackend() *DowngradeInfo { return nil }
func (b *backendMock) MustSaveDowngradeToBackend(*version.DowngradeInfo) {}
func (b *backendMock) DowngradeInfoFromBackend() *version.DowngradeInfo { return nil }

View File

@ -18,6 +18,7 @@ import (
"path"
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/server/v3/etcdserver/version"
"github.com/coreos/go-semver/semver"
"go.uber.org/zap"
@ -43,8 +44,8 @@ type MemberBackend interface {
}
type DowngradeInfoBackend interface {
MustSaveDowngradeToBackend(*DowngradeInfo)
DowngradeInfoFromBackend() *DowngradeInfo
MustSaveDowngradeToBackend(*version.DowngradeInfo)
DowngradeInfoFromBackend() *version.DowngradeInfo
}
func MustParseMemberIDFromKey(lg *zap.Logger, key string) types.ID {

View File

@ -23,6 +23,7 @@ import (
"go.etcd.io/etcd/server/v3/auth"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/version"
"go.etcd.io/etcd/server/v3/lease"
"go.etcd.io/etcd/server/v3/storage/mvcc"
@ -58,11 +59,11 @@ var toGRPCErrorMap = map[error]error{
etcdserver.ErrCorrupt: rpctypes.ErrGRPCCorrupt,
etcdserver.ErrBadLeaderTransferee: rpctypes.ErrGRPCBadLeaderTransferee,
etcdserver.ErrClusterVersionUnavailable: rpctypes.ErrGRPCClusterVersionUnavailable,
etcdserver.ErrWrongDowngradeVersionFormat: rpctypes.ErrGRPCWrongDowngradeVersionFormat,
etcdserver.ErrInvalidDowngradeTargetVersion: rpctypes.ErrGRPCInvalidDowngradeTargetVersion,
etcdserver.ErrDowngradeInProcess: rpctypes.ErrGRPCDowngradeInProcess,
etcdserver.ErrNoInflightDowngrade: rpctypes.ErrGRPCNoInflightDowngrade,
etcdserver.ErrClusterVersionUnavailable: rpctypes.ErrGRPCClusterVersionUnavailable,
etcdserver.ErrWrongDowngradeVersionFormat: rpctypes.ErrGRPCWrongDowngradeVersionFormat,
version.ErrInvalidDowngradeTargetVersion: rpctypes.ErrGRPCInvalidDowngradeTargetVersion,
version.ErrDowngradeInProcess: rpctypes.ErrGRPCDowngradeInProcess,
version.ErrNoInflightDowngrade: rpctypes.ErrGRPCNoInflightDowngrade,
lease.ErrLeaseNotFound: rpctypes.ErrGRPCLeaseNotFound,
lease.ErrLeaseExists: rpctypes.ErrGRPCLeaseExist,

View File

@ -31,6 +31,7 @@ import (
"go.etcd.io/etcd/server/v3/auth"
"go.etcd.io/etcd/server/v3/etcdserver/api"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/version"
"go.etcd.io/etcd/server/v3/lease"
serverstorage "go.etcd.io/etcd/server/v3/storage"
"go.etcd.io/etcd/server/v3/storage/mvcc"
@ -946,9 +947,9 @@ func (a *applierV3backend) ClusterMemberAttrSet(r *membershippb.ClusterMemberAtt
}
func (a *applierV3backend) DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest, shouldApplyV3 membership.ShouldApplyV3) {
d := membership.DowngradeInfo{Enabled: false}
d := version.DowngradeInfo{Enabled: false}
if r.Enabled {
d = membership.DowngradeInfo{Enabled: true, TargetVersion: r.Ver}
d = version.DowngradeInfo{Enabled: true, TargetVersion: r.Ver}
}
a.s.cluster.SetDowngradeInfo(&d, shouldApplyV3)
}

View File

@ -529,7 +529,7 @@ func (b *bootstrappedRaft) newRaftNode(ss *snap.Snapshotter, wal *wal.WAL, cl *m
Node: n,
heartbeat: b.heartbeat,
raftStorage: b.storage,
storage: NewStorage(wal, ss),
storage: serverstorage.NewStorage(b.lg, wal, ss),
},
)
}

View File

@ -134,11 +134,11 @@ func getRemotePeerURLs(cl *membership.RaftCluster, local string) []string {
return us
}
// getVersions returns the versions of the members in the given cluster.
// getMembersVersions returns the versions of the members in the given cluster.
// The key of the returned map is the member's ID. The value of the returned map
// is the semver versions string, including server and cluster.
// If it fails to get the version of a member, the key will be nil.
func getVersions(lg *zap.Logger, cl *membership.RaftCluster, local types.ID, rt http.RoundTripper) map[string]*version.Versions {
func getMembersVersions(lg *zap.Logger, cl *membership.RaftCluster, local types.ID, rt http.RoundTripper) map[string]*version.Versions {
members := cl.Members()
vers := make(map[string]*version.Versions)
for _, m := range members {
@ -184,7 +184,7 @@ func allowedVersionRange(downgradeEnabled bool) (minV *semver.Version, maxV *sem
// out of the range.
// We set this rule since when the local member joins, another member might be offline.
func isCompatibleWithCluster(lg *zap.Logger, cl *membership.RaftCluster, local types.ID, rt http.RoundTripper) bool {
vers := getVersions(lg, cl, local, rt)
vers := getMembersVersions(lg, cl, local, rt)
minV, maxV := allowedVersionRange(getDowngradeEnabledFromRemotePeers(lg, cl, local, rt))
return isCompatibleWithVers(lg, vers, local, minV, maxV)
}

View File

@ -20,30 +20,27 @@ import (
)
var (
ErrUnknownMethod = errors.New("etcdserver: unknown method")
ErrStopped = errors.New("etcdserver: server stopped")
ErrCanceled = errors.New("etcdserver: request cancelled")
ErrTimeout = errors.New("etcdserver: request timed out")
ErrTimeoutDueToLeaderFail = errors.New("etcdserver: request timed out, possibly due to previous leader failure")
ErrTimeoutDueToConnectionLost = errors.New("etcdserver: request timed out, possibly due to connection lost")
ErrTimeoutLeaderTransfer = errors.New("etcdserver: request timed out, leader transfer took too long")
ErrLeaderChanged = errors.New("etcdserver: leader changed")
ErrNotEnoughStartedMembers = errors.New("etcdserver: re-configuration failed due to not enough started members")
ErrLearnerNotReady = errors.New("etcdserver: can only promote a learner member which is in sync with leader")
ErrNoLeader = errors.New("etcdserver: no leader")
ErrNotLeader = errors.New("etcdserver: not leader")
ErrRequestTooLarge = errors.New("etcdserver: request is too large")
ErrNoSpace = errors.New("etcdserver: no space")
ErrTooManyRequests = errors.New("etcdserver: too many requests")
ErrUnhealthy = errors.New("etcdserver: unhealthy cluster")
ErrKeyNotFound = errors.New("etcdserver: key not found")
ErrCorrupt = errors.New("etcdserver: corrupt cluster")
ErrBadLeaderTransferee = errors.New("etcdserver: bad leader transferee")
ErrClusterVersionUnavailable = errors.New("etcdserver: cluster version not found during downgrade")
ErrWrongDowngradeVersionFormat = errors.New("etcdserver: wrong downgrade target version format")
ErrInvalidDowngradeTargetVersion = errors.New("etcdserver: invalid downgrade target version")
ErrDowngradeInProcess = errors.New("etcdserver: cluster has a downgrade job in progress")
ErrNoInflightDowngrade = errors.New("etcdserver: no inflight downgrade job")
ErrUnknownMethod = errors.New("etcdserver: unknown method")
ErrStopped = errors.New("etcdserver: server stopped")
ErrCanceled = errors.New("etcdserver: request cancelled")
ErrTimeout = errors.New("etcdserver: request timed out")
ErrTimeoutDueToLeaderFail = errors.New("etcdserver: request timed out, possibly due to previous leader failure")
ErrTimeoutDueToConnectionLost = errors.New("etcdserver: request timed out, possibly due to connection lost")
ErrTimeoutLeaderTransfer = errors.New("etcdserver: request timed out, leader transfer took too long")
ErrLeaderChanged = errors.New("etcdserver: leader changed")
ErrNotEnoughStartedMembers = errors.New("etcdserver: re-configuration failed due to not enough started members")
ErrLearnerNotReady = errors.New("etcdserver: can only promote a learner member which is in sync with leader")
ErrNoLeader = errors.New("etcdserver: no leader")
ErrNotLeader = errors.New("etcdserver: not leader")
ErrRequestTooLarge = errors.New("etcdserver: request is too large")
ErrNoSpace = errors.New("etcdserver: no space")
ErrTooManyRequests = errors.New("etcdserver: too many requests")
ErrUnhealthy = errors.New("etcdserver: unhealthy cluster")
ErrKeyNotFound = errors.New("etcdserver: key not found")
ErrCorrupt = errors.New("etcdserver: corrupt cluster")
ErrBadLeaderTransferee = errors.New("etcdserver: bad leader transferee")
ErrClusterVersionUnavailable = errors.New("etcdserver: cluster version not found during downgrade")
ErrWrongDowngradeVersionFormat = errors.New("etcdserver: wrong downgrade target version format")
)
type DiscoveryError struct {

View File

@ -26,6 +26,7 @@ import (
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp"
serverstorage "go.etcd.io/etcd/server/v3/storage"
"go.uber.org/zap"
)
@ -102,7 +103,7 @@ type raftNodeConfig struct {
isIDRemoved func(id uint64) bool
raft.Node
raftStorage *raft.MemoryStorage
storage Storage
storage serverstorage.Storage
heartbeat time.Duration // for logging
// transport specifies the transport to send and receive msgs to members.
// Sending messages MUST NOT block. It is okay to drop messages, since

View File

@ -636,7 +636,7 @@ type ServerPeerV2 interface {
DowngradeEnabledHandler() http.Handler
}
func (s *EtcdServer) DowngradeInfo() *membership.DowngradeInfo { return s.cluster.DowngradeInfo() }
func (s *EtcdServer) DowngradeInfo() *serverversion.DowngradeInfo { return s.cluster.DowngradeInfo() }
type downgradeEnabledHandler struct {
lg *zap.Logger
@ -2359,3 +2359,7 @@ func (s *EtcdServer) IsMemberExist(id types.ID) bool {
func (s *EtcdServer) raftStatus() raft.Status {
return s.r.Node.Status()
}
func (s *EtcdServer) Version() *serverversion.Manager {
return serverversion.NewManager(s.Logger(), newServerVersionAdapter(s))
}

View File

@ -23,7 +23,6 @@ import (
"time"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/membershippb"
"go.etcd.io/etcd/pkg/v3/traceutil"
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/server/v3/auth"
@ -919,48 +918,27 @@ func (s *EtcdServer) downgradeValidate(ctx context.Context, v string) (*pb.Downg
return nil, err
}
// gets leaders commit index and wait for local store to finish applying that index
// to avoid using stale downgrade information
err = s.linearizableReadNotify(ctx)
if err != nil {
return nil, err
}
cv := s.ClusterVersion()
if cv == nil {
return nil, ErrClusterVersionUnavailable
}
resp.Version = cv.String()
allowedTargetVersion := membership.AllowedDowngradeVersion(cv)
if !targetVersion.Equal(*allowedTargetVersion) {
return nil, ErrInvalidDowngradeTargetVersion
err = s.Version().DowngradeValidate(ctx, targetVersion)
if err != nil {
return nil, err
}
downgradeInfo := s.cluster.DowngradeInfo()
if downgradeInfo.Enabled {
// Todo: return the downgrade status along with the error msg
return nil, ErrDowngradeInProcess
}
return resp, nil
}
func (s *EtcdServer) downgradeEnable(ctx context.Context, r *pb.DowngradeRequest) (*pb.DowngradeResponse, error) {
// validate downgrade capability before starting downgrade
v := r.Version
lg := s.Logger()
if resp, err := s.downgradeValidate(ctx, v); err != nil {
lg.Warn("reject downgrade request", zap.Error(err))
return resp, err
}
targetVersion, err := convertToClusterVersion(v)
targetVersion, err := convertToClusterVersion(r.Version)
if err != nil {
lg.Warn("reject downgrade request", zap.Error(err))
return nil, err
}
raftRequest := membershippb.DowngradeInfoSetRequest{Enabled: true, Ver: targetVersion.String()}
_, err = s.raftRequest(ctx, pb.InternalRaftRequest{DowngradeInfoSet: &raftRequest})
err = s.Version().DowngradeEnable(ctx, targetVersion)
if err != nil {
lg.Warn("reject downgrade request", zap.Error(err))
return nil, err
@ -970,21 +948,9 @@ func (s *EtcdServer) downgradeEnable(ctx context.Context, r *pb.DowngradeRequest
}
func (s *EtcdServer) downgradeCancel(ctx context.Context) (*pb.DowngradeResponse, error) {
// gets leaders commit index and wait for local store to finish applying that index
// to avoid using stale downgrade information
if err := s.linearizableReadNotify(ctx); err != nil {
return nil, err
}
downgradeInfo := s.cluster.DowngradeInfo()
if !downgradeInfo.Enabled {
return nil, ErrNoInflightDowngrade
}
raftRequest := membershippb.DowngradeInfoSetRequest{Enabled: false}
_, err := s.raftRequest(ctx, pb.InternalRaftRequest{DowngradeInfoSet: &raftRequest})
err := s.Version().DowngradeCancel(ctx)
if err != nil {
return nil, err
s.lg.Warn("failed to cancel downgrade", zap.Error(err))
}
resp := pb.DowngradeResponse{Version: s.ClusterVersion().String()}
return &resp, nil

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package membership
package version
import (
"github.com/coreos/go-semver/semver"
@ -34,47 +34,62 @@ func (d *DowngradeInfo) GetTargetVersion() *semver.Version {
// isValidDowngrade verifies whether the cluster can be downgraded from verFrom to verTo
func isValidDowngrade(verFrom *semver.Version, verTo *semver.Version) bool {
return verTo.Equal(*AllowedDowngradeVersion(verFrom))
return verTo.Equal(*allowedDowngradeVersion(verFrom))
}
// mustDetectDowngrade will detect unexpected downgrade when the local server is recovered.
func mustDetectDowngrade(lg *zap.Logger, cv *semver.Version, d *DowngradeInfo) {
lv := semver.Must(semver.NewVersion(version.Version))
// MustDetectDowngrade will detect unexpected downgrade when the local server is recovered.
func MustDetectDowngrade(lg *zap.Logger, sv, cv *semver.Version, d *DowngradeInfo) {
// only keep major.minor version for comparison against cluster version
lv = &semver.Version{Major: lv.Major, Minor: lv.Minor}
sv = &semver.Version{Major: sv.Major, Minor: sv.Minor}
// if the cluster enables downgrade, check local version against downgrade target version.
if d != nil && d.Enabled && d.TargetVersion != "" {
if lv.Equal(*d.GetTargetVersion()) {
if sv.Equal(*d.GetTargetVersion()) {
if cv != nil {
lg.Info(
"cluster is downgrading to target version",
zap.String("target-cluster-version", d.TargetVersion),
zap.String("determined-cluster-version", version.Cluster(cv.String())),
zap.String("current-server-version", version.Version),
zap.String("current-server-version", sv.String()),
)
}
return
}
lg.Fatal(
lg.Panic(
"invalid downgrade; server version is not allowed to join when downgrade is enabled",
zap.String("current-server-version", version.Version),
zap.String("current-server-version", sv.String()),
zap.String("target-cluster-version", d.TargetVersion),
)
}
// if the cluster disables downgrade, check local version against determined cluster version.
// the validation passes when local version is not less than cluster version
if cv != nil && lv.LessThan(*cv) {
lg.Fatal(
if cv != nil && sv.LessThan(*cv) {
lg.Panic(
"invalid downgrade; server version is lower than determined cluster version",
zap.String("current-server-version", version.Version),
zap.String("current-server-version", sv.String()),
zap.String("determined-cluster-version", version.Cluster(cv.String())),
)
}
}
func AllowedDowngradeVersion(ver *semver.Version) *semver.Version {
func allowedDowngradeVersion(ver *semver.Version) *semver.Version {
// Todo: handle the case that downgrading from higher major version(e.g. downgrade from v4.0 to v3.x)
return &semver.Version{Major: ver.Major, Minor: ver.Minor - 1}
}
// IsValidVersionChange checks the two scenario when version is valid to change:
// 1. Downgrade: cluster version is 1 minor version higher than local version,
// cluster version should change.
// 2. Cluster start: when not all members version are available, cluster version
// is set to MinVersion(3.0), when all members are at higher version, cluster version
// is lower than local version, cluster version should change
func IsValidVersionChange(cv *semver.Version, lv *semver.Version) bool {
cv = &semver.Version{Major: cv.Major, Minor: cv.Minor}
lv = &semver.Version{Major: lv.Major, Minor: lv.Minor}
if isValidDowngrade(cv, lv) || (cv.Major == lv.Major && cv.LessThan(*lv)) {
return true
}
return false
}

View File

@ -12,21 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package membership
package version
import (
"bytes"
"fmt"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"strconv"
"testing"
"github.com/coreos/go-semver/semver"
"go.etcd.io/etcd/api/v3/version"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
)
func TestMustDetectDowngrade(t *testing.T) {
@ -112,57 +107,31 @@ func TestMustDetectDowngrade(t *testing.T) {
},
}
if os.Getenv("DETECT_DOWNGRADE") != "" {
i := os.Getenv("DETECT_DOWNGRADE")
iint, _ := strconv.Atoi(i)
logPath := filepath.Join(os.TempDir(), fmt.Sprintf("test-log-must-detect-downgrade-%v", iint))
lcfg := zap.NewProductionConfig()
lcfg.OutputPaths = []string{logPath}
lcfg.ErrorOutputPaths = []string{logPath}
lg, _ := lcfg.Build()
mustDetectDowngrade(lg, tests[iint].clusterVersion, tests[iint].downgrade)
return
}
for i, tt := range tests {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
logPath := filepath.Join(os.TempDir(), fmt.Sprintf("test-log-must-detect-downgrade-%d", i))
t.Log(logPath)
defer os.RemoveAll(logPath)
lg := zaptest.NewLogger(t)
sv := semver.Must(semver.NewVersion(version.Version))
err := tryMustDetectDowngrade(lg, sv, tt.clusterVersion, tt.downgrade)
cmd := exec.Command(os.Args[0], "-test.run=TestMustDetectDowngrade")
cmd.Env = append(os.Environ(), fmt.Sprintf("DETECT_DOWNGRADE=%d", i))
if err := cmd.Start(); err != nil {
t.Fatal(err)
if tt.success != (err == nil) {
t.Errorf("Unexpected status, got %q, wanted: %v", err, tt.success)
// TODO test err
}
errCmd := cmd.Wait()
data, err := ioutil.ReadFile(logPath)
if err == nil {
if !bytes.Contains(data, []byte(tt.message)) {
t.Errorf("Expected to find %v in log", tt.message)
}
} else {
t.Fatal(err)
}
if !tt.success {
e, ok := errCmd.(*exec.ExitError)
if !ok || e.Success() {
t.Errorf("Expected exit with status 1; Got %v", err)
}
}
if tt.success && errCmd != nil {
t.Errorf("Expected not failure; Got %v", errCmd)
if err != nil && tt.message != fmt.Sprintf("%s", err) {
t.Errorf("Unexpected message, got %q, wanted: %v", err, tt.message)
}
})
}
}
func tryMustDetectDowngrade(lg *zap.Logger, sv, cv *semver.Version, d *DowngradeInfo) (err interface{}) {
defer func() {
err = recover()
}()
MustDetectDowngrade(lg, sv, cv, d)
return err
}
func TestIsValidDowngrade(t *testing.T) {
tests := []struct {
name string
@ -193,3 +162,75 @@ func TestIsValidDowngrade(t *testing.T) {
})
}
}
func TestIsVersionChangable(t *testing.T) {
v0 := semver.Must(semver.NewVersion("2.4.0"))
v1 := semver.Must(semver.NewVersion("3.4.0"))
v2 := semver.Must(semver.NewVersion("3.5.0"))
v3 := semver.Must(semver.NewVersion("3.5.1"))
v4 := semver.Must(semver.NewVersion("3.6.0"))
tests := []struct {
name string
currentVersion *semver.Version
localVersion *semver.Version
expectedResult bool
}{
{
name: "When local version is one minor lower than cluster version",
currentVersion: v2,
localVersion: v1,
expectedResult: true,
},
{
name: "When local version is one minor and one patch lower than cluster version",
currentVersion: v3,
localVersion: v1,
expectedResult: true,
},
{
name: "When local version is one minor higher than cluster version",
currentVersion: v1,
localVersion: v2,
expectedResult: true,
},
{
name: "When local version is two minor higher than cluster version",
currentVersion: v1,
localVersion: v4,
expectedResult: true,
},
{
name: "When local version is one major higher than cluster version",
currentVersion: v0,
localVersion: v1,
expectedResult: false,
},
{
name: "When local version is equal to cluster version",
currentVersion: v1,
localVersion: v1,
expectedResult: false,
},
{
name: "When local version is one patch higher than cluster version",
currentVersion: v2,
localVersion: v3,
expectedResult: false,
},
{
name: "When local version is two minor lower than cluster version",
currentVersion: v4,
localVersion: v1,
expectedResult: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if ret := IsValidVersionChange(tt.currentVersion, tt.localVersion); ret != tt.expectedResult {
t.Errorf("Expected %v; Got %v", tt.expectedResult, ret)
}
})
}
}

View File

@ -0,0 +1,23 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package version
import "errors"
var (
ErrInvalidDowngradeTargetVersion = errors.New("etcdserver: invalid downgrade target version")
ErrDowngradeInProcess = errors.New("etcdserver: cluster has a downgrade job in progress")
ErrNoInflightDowngrade = errors.New("etcdserver: no inflight downgrade job")
)

View File

@ -15,9 +15,10 @@
package version
import (
"context"
"github.com/coreos/go-semver/semver"
"go.etcd.io/etcd/api/v3/version"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.uber.org/zap"
)
@ -30,10 +31,12 @@ type Monitor struct {
// Server lists EtcdServer methods needed by Monitor
type Server interface {
GetClusterVersion() *semver.Version
GetDowngradeInfo() *membership.DowngradeInfo
GetVersions() map[string]*version.Versions
GetDowngradeInfo() *DowngradeInfo
GetMembersVersions() map[string]*version.Versions
UpdateClusterVersion(string)
DowngradeCancel()
LinearizableReadNotify(ctx context.Context) error
DowngradeEnable(ctx context.Context, targetVersion *semver.Version) error
DowngradeCancel(ctx context.Context) error
GetStorageVersion() *semver.Version
UpdateStorageVersion(semver.Version)
@ -49,34 +52,29 @@ func NewMonitor(lg *zap.Logger, storage Server) *Monitor {
}
}
// UpdateClusterVersionIfNeeded updates the cluster version if all members agrees on a higher one.
// It prints out log if there is a member with a higher version than the
// local version.
// UpdateClusterVersionIfNeeded updates the cluster version.
func (m *Monitor) UpdateClusterVersionIfNeeded() {
v := m.decideClusterVersion()
if v != nil {
// only keep major.minor version for comparison
v = &semver.Version{
Major: v.Major,
Minor: v.Minor,
}
newClusterVersion := m.decideClusterVersion()
if newClusterVersion != nil {
newClusterVersion = &semver.Version{Major: newClusterVersion.Major, Minor: newClusterVersion.Minor}
m.s.UpdateClusterVersion(newClusterVersion.String())
}
}
// if the current version is nil:
// 1. use the decided version if possible
// 2. or use the min cluster version
if m.s.GetClusterVersion() == nil {
verStr := version.MinClusterVersion
if v != nil {
verStr = v.String()
// decideClusterVersion decides the cluster version based on the members versions if all members agree on a higher one.
func (m *Monitor) decideClusterVersion() *semver.Version {
clusterVersion := m.s.GetClusterVersion()
membersMinimalVersion := m.membersMinimalVersion()
if clusterVersion == nil {
if membersMinimalVersion != nil {
return membersMinimalVersion
}
m.s.UpdateClusterVersion(verStr)
return
return semver.New(version.MinClusterVersion)
}
if v != nil && membership.IsValidVersionChange(m.s.GetClusterVersion(), v) {
m.s.UpdateClusterVersion(v.String())
if membersMinimalVersion != nil && clusterVersion.LessThan(*membersMinimalVersion) && IsValidVersionChange(clusterVersion, membersMinimalVersion) {
return membersMinimalVersion
}
return nil
}
// UpdateStorageVersionIfNeeded updates the storage version if it differs from cluster version.
@ -99,7 +97,7 @@ func (m *Monitor) UpdateStorageVersionIfNeeded() {
func (m *Monitor) CancelDowngradeIfNeeded() {
d := m.s.GetDowngradeInfo()
if !d.Enabled {
if d == nil || !d.Enabled {
return
}
@ -107,16 +105,20 @@ func (m *Monitor) CancelDowngradeIfNeeded() {
v := semver.Must(semver.NewVersion(targetVersion))
if m.versionsMatchTarget(v) {
m.lg.Info("the cluster has been downgraded", zap.String("cluster-version", targetVersion))
m.s.DowngradeCancel()
err := m.s.DowngradeCancel(context.Background())
if err != nil {
m.lg.Warn("failed to cancel downgrade", zap.Error(err))
}
}
}
// decideClusterVersion decides the cluster version based on the versions map.
// The returned version is the min server version in the map, or nil if the min
// membersMinimalVersion returns the min server version in the map, or nil if the min
// version in unknown.
func (m *Monitor) decideClusterVersion() *semver.Version {
vers := m.s.GetVersions()
var cv *semver.Version
// It prints out log if there is a member with a higher version than the
// local version.
func (m *Monitor) membersMinimalVersion() *semver.Version {
vers := m.s.GetMembersVersions()
var minV *semver.Version
lv := semver.Must(semver.NewVersion(version.Version))
for mid, ver := range vers {
@ -141,19 +143,19 @@ func (m *Monitor) decideClusterVersion() *semver.Version {
zap.String("remote-member-version", ver.Server),
)
}
if cv == nil {
cv = v
} else if v.LessThan(*cv) {
cv = v
if minV == nil {
minV = v
} else if v.LessThan(*minV) {
minV = v
}
}
return cv
return minV
}
// versionsMatchTarget returns true if all server versions are equal to target version, otherwise return false.
// It can be used to decide the whether the cluster finishes downgrading to target version.
func (m *Monitor) versionsMatchTarget(targetVersion *semver.Version) bool {
vers := m.s.GetVersions()
vers := m.s.GetMembersVersions()
for mid, ver := range vers {
if ver == nil {
return false

View File

@ -1,27 +1,27 @@
package version
import (
"context"
"reflect"
"testing"
"github.com/coreos/go-semver/semver"
"go.uber.org/zap"
"github.com/stretchr/testify/assert"
"go.uber.org/zap/zaptest"
"go.etcd.io/etcd/api/v3/version"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
)
var testLogger = zap.NewExample()
var (
V3_0 = semver.Version{Major: 3, Minor: 0}
V3_5 = semver.Version{Major: 3, Minor: 5}
V3_6 = semver.Version{Major: 3, Minor: 6}
)
func TestDecideClusterVersion(t *testing.T) {
func TestMemberMinimalVersion(t *testing.T) {
tests := []struct {
vers map[string]*version.Versions
wdver *semver.Version
memberVersions map[string]*version.Versions
wantVersion *semver.Version
}{
{
map[string]*version.Versions{"a": {Server: "2.0.0"}},
@ -47,12 +47,12 @@ func TestDecideClusterVersion(t *testing.T) {
}
for i, tt := range tests {
monitor := NewMonitor(testLogger, &storageMock{
versions: tt.vers,
monitor := NewMonitor(zaptest.NewLogger(t), &storageMock{
memberVersions: tt.memberVersions,
})
dver := monitor.decideClusterVersion()
if !reflect.DeepEqual(dver, tt.wdver) {
t.Errorf("#%d: ver = %+v, want %+v", i, dver, tt.wdver)
minV := monitor.membersMinimalVersion()
if !reflect.DeepEqual(minV, tt.wantVersion) {
t.Errorf("#%d: ver = %+v, want %+v", i, minV, tt.wantVersion)
}
}
}
@ -97,7 +97,7 @@ func TestDecideStorageVersion(t *testing.T) {
clusterVersion: tt.clusterVersion,
storageVersion: tt.storageVersion,
}
monitor := NewMonitor(testLogger, s)
monitor := NewMonitor(zaptest.NewLogger(t), s)
monitor.UpdateStorageVersionIfNeeded()
if !reflect.DeepEqual(s.storageVersion, tt.expectStorageVersion) {
t.Errorf("Unexpected storage version value, got = %+v, want %+v", s.storageVersion, tt.expectStorageVersion)
@ -147,8 +147,8 @@ func TestVersionMatchTarget(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
monitor := NewMonitor(testLogger, &storageMock{
versions: tt.versionMap,
monitor := NewMonitor(zaptest.NewLogger(t), &storageMock{
memberVersions: tt.versionMap,
})
actual := monitor.versionsMatchTarget(tt.targetVersion)
if actual != tt.expectedFinished {
@ -158,11 +158,179 @@ func TestVersionMatchTarget(t *testing.T) {
}
}
func TestUpdateClusterVersionIfNeeded(t *testing.T) {
tests := []struct {
name string
clusterVersion *semver.Version
memberVersions map[string]*version.Versions
downgrade *DowngradeInfo
expectClusterVersion *semver.Version
}{
{
name: "Default to 3.0 if there are no members",
expectClusterVersion: &V3_0,
},
{
name: "Should pick lowest server version from members",
memberVersions: map[string]*version.Versions{
"a": {Cluster: "3.7.0", Server: "3.6.0"},
"b": {Cluster: "3.4.0", Server: "3.5.0"},
},
expectClusterVersion: &V3_5,
},
{
name: "Sets minimal version when member has broken version",
memberVersions: map[string]*version.Versions{
"a": {Cluster: "3.7.0", Server: "3.6.0"},
"b": {Cluster: "xxxx", Server: "yyyy"},
},
expectClusterVersion: &V3_0,
},
{
name: "Should pick lowest server version from members (cv already set)",
memberVersions: map[string]*version.Versions{
"a": {Cluster: "3.7.0", Server: "3.6.0"},
"b": {Cluster: "3.4.0", Server: "3.5.0"},
},
clusterVersion: &V3_5,
expectClusterVersion: &V3_5,
},
{
name: "Should upgrade cluster version if all members have upgraded (have higher server version)",
memberVersions: map[string]*version.Versions{
"a": {Cluster: "3.5.0", Server: "3.6.0"},
"b": {Cluster: "3.5.0", Server: "3.6.0"},
},
clusterVersion: &V3_5,
expectClusterVersion: &V3_6,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := &storageMock{
clusterVersion: tt.clusterVersion,
memberVersions: tt.memberVersions,
downgradeInfo: tt.downgrade,
}
monitor := NewMonitor(zaptest.NewLogger(t), s)
// Run multiple times to ensure that results are stable
for i := 0; i < 3; i++ {
monitor.UpdateClusterVersionIfNeeded()
assert.Equal(t, tt.expectClusterVersion, s.clusterVersion)
}
})
}
}
func TestCancelDowngradeIfNeeded(t *testing.T) {
tests := []struct {
name string
memberVersions map[string]*version.Versions
downgrade *DowngradeInfo
expectDowngrade *DowngradeInfo
}{
{
name: "No action if there no downgrade in progress",
},
{
name: "Cancel downgrade if there are no members",
downgrade: &DowngradeInfo{TargetVersion: "3.5.0", Enabled: true},
expectDowngrade: nil,
},
// Next entries go through all states that should happen during downgrade
{
name: "No action if downgrade was not started",
memberVersions: map[string]*version.Versions{
"a": {Cluster: "3.6.0", Server: "3.6.1"},
"b": {Cluster: "3.6.0", Server: "3.6.2"},
},
},
{
name: "Cancel downgrade if all members have downgraded",
memberVersions: map[string]*version.Versions{
"a": {Cluster: "3.5.0", Server: "3.5.1"},
"b": {Cluster: "3.5.0", Server: "3.5.2"},
},
downgrade: &DowngradeInfo{TargetVersion: "3.5.0", Enabled: true},
expectDowngrade: nil,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := &storageMock{
memberVersions: tt.memberVersions,
downgradeInfo: tt.downgrade,
}
monitor := NewMonitor(zaptest.NewLogger(t), s)
// Run multiple times to ensure that results are stable
for i := 0; i < 3; i++ {
monitor.CancelDowngradeIfNeeded()
assert.Equal(t, tt.expectDowngrade, s.downgradeInfo)
}
})
}
}
func TestUpdateStorageVersionIfNeeded(t *testing.T) {
tests := []struct {
name string
clusterVersion *semver.Version
storageVersion *semver.Version
expectStorageVersion *semver.Version
}{
{
name: "No action if cluster version is nil",
},
{
name: "Should set storage version if cluster version is set",
clusterVersion: &V3_5,
expectStorageVersion: &V3_5,
},
{
name: "No action if storage version was already set",
storageVersion: &V3_5,
expectStorageVersion: &V3_5,
},
{
name: "No action if storage version equals cluster version",
clusterVersion: &V3_5,
storageVersion: &V3_5,
expectStorageVersion: &V3_5,
},
{
name: "Should set storage version to cluster version",
clusterVersion: &V3_6,
storageVersion: &V3_5,
expectStorageVersion: &V3_6,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := &storageMock{
clusterVersion: tt.clusterVersion,
storageVersion: tt.storageVersion,
}
monitor := NewMonitor(zaptest.NewLogger(t), s)
// Run multiple times to ensure that results are stable
for i := 0; i < 3; i++ {
monitor.UpdateStorageVersionIfNeeded()
assert.Equal(t, tt.expectStorageVersion, s.storageVersion)
}
})
}
}
type storageMock struct {
versions map[string]*version.Versions
memberVersions map[string]*version.Versions
clusterVersion *semver.Version
storageVersion *semver.Version
downgradeInfo *membership.DowngradeInfo
downgradeInfo *DowngradeInfo
locked bool
}
@ -172,20 +340,29 @@ func (s *storageMock) UpdateClusterVersion(version string) {
s.clusterVersion = semver.New(version)
}
func (s *storageMock) DowngradeCancel() {
func (s *storageMock) LinearizableReadNotify(ctx context.Context) error {
return nil
}
func (s *storageMock) DowngradeEnable(ctx context.Context, targetVersion *semver.Version) error {
return nil
}
func (s *storageMock) DowngradeCancel(ctx context.Context) error {
s.downgradeInfo = nil
return nil
}
func (s *storageMock) GetClusterVersion() *semver.Version {
return s.clusterVersion
}
func (s *storageMock) GetDowngradeInfo() *membership.DowngradeInfo {
func (s *storageMock) GetDowngradeInfo() *DowngradeInfo {
return s.downgradeInfo
}
func (s *storageMock) GetVersions() map[string]*version.Versions {
return s.versions
func (s *storageMock) GetMembersVersions() map[string]*version.Versions {
return s.memberVersions
}
func (s *storageMock) GetStorageVersion() *semver.Version {

View File

@ -0,0 +1,81 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package version
import (
"context"
"github.com/coreos/go-semver/semver"
"go.uber.org/zap"
)
// Manager contains logic to manage etcd cluster version downgrade process.
type Manager struct {
lg *zap.Logger
s Server
}
// NewManager returns a new manager instance
func NewManager(lg *zap.Logger, s Server) *Manager {
return &Manager{
lg: lg,
s: s,
}
}
// DowngradeValidate validates if cluster is downloadable to provided target version and returns error if not.
func (m *Manager) DowngradeValidate(ctx context.Context, targetVersion *semver.Version) error {
// gets leaders commit index and wait for local store to finish applying that index
// to avoid using stale downgrade information
err := m.s.LinearizableReadNotify(ctx)
if err != nil {
return err
}
cv := m.s.GetClusterVersion()
allowedTargetVersion := allowedDowngradeVersion(cv)
if !targetVersion.Equal(*allowedTargetVersion) {
return ErrInvalidDowngradeTargetVersion
}
downgradeInfo := m.s.GetDowngradeInfo()
if downgradeInfo != nil && downgradeInfo.Enabled {
// Todo: return the downgrade status along with the error msg
return ErrDowngradeInProcess
}
return nil
}
// DowngradeEnable initiates etcd cluster version downgrade process.
func (m *Manager) DowngradeEnable(ctx context.Context, targetVersion *semver.Version) error {
// validate downgrade capability before starting downgrade
err := m.DowngradeValidate(ctx, targetVersion)
if err != nil {
return err
}
return m.s.DowngradeEnable(context.Background(), targetVersion)
}
// DowngradeCancel cancels ongoing downgrade process.
func (m *Manager) DowngradeCancel(ctx context.Context) error {
err := m.s.LinearizableReadNotify(ctx)
if err != nil {
return err
}
downgradeInfo := m.s.GetDowngradeInfo()
if !downgradeInfo.Enabled {
return ErrNoInflightDowngrade
}
return m.s.DowngradeCancel(ctx)
}

View File

@ -0,0 +1,188 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package version
import (
"context"
"fmt"
"math/rand"
"testing"
"github.com/coreos/go-semver/semver"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"go.etcd.io/etcd/api/v3/version"
)
var (
V3_7 = semver.Version{Major: 3, Minor: 7}
)
func TestUpgradeSingleNode(t *testing.T) {
lg := zaptest.NewLogger(t)
c := newCluster(lg, 1, V3_6)
c.StepMonitors()
assert.Equal(t, newCluster(lg, 1, V3_6), c)
c.ReplaceMemberBinary(0, V3_7)
c.StepMonitors()
c.StepMonitors()
assert.Equal(t, newCluster(lg, 1, V3_7), c)
}
func TestUpgradeThreeNodes(t *testing.T) {
lg := zaptest.NewLogger(t)
c := newCluster(lg, 3, V3_6)
c.StepMonitors()
assert.Equal(t, newCluster(lg, 3, V3_6), c)
c.ReplaceMemberBinary(0, V3_7)
c.StepMonitors()
c.ReplaceMemberBinary(1, V3_7)
c.StepMonitors()
c.ReplaceMemberBinary(2, V3_7)
c.StepMonitors()
c.StepMonitors()
assert.Equal(t, newCluster(lg, 3, V3_7), c)
}
func newCluster(lg *zap.Logger, memberCount int, ver semver.Version) *clusterMock {
cluster := &clusterMock{
lg: lg,
clusterVersion: ver,
members: make([]*memberMock, 0, memberCount),
}
majorMinVer := semver.Version{Major: ver.Major, Minor: ver.Minor}
for i := 0; i < memberCount; i++ {
m := &memberMock{
cluster: cluster,
serverVersion: ver,
storageVersion: majorMinVer,
}
m.monitor = NewMonitor(lg.Named(fmt.Sprintf("m%d", i)), m)
cluster.members = append(cluster.members, m)
}
cluster.members[0].isLeader = true
return cluster
}
func (c *clusterMock) StepMonitors() {
// Execute monitor functions in random order as it is not guaranteed
fs := []func(){}
for _, m := range c.members {
fs = append(fs, m.monitor.UpdateStorageVersionIfNeeded)
if m.isLeader {
fs = append(fs, m.monitor.CancelDowngradeIfNeeded, m.monitor.UpdateClusterVersionIfNeeded)
}
}
rand.Shuffle(len(fs), func(i, j int) {
fs[i], fs[j] = fs[j], fs[i]
})
for _, f := range fs {
f()
}
}
type clusterMock struct {
lg *zap.Logger
clusterVersion semver.Version
downgradeInfo *DowngradeInfo
members []*memberMock
}
func (c *clusterMock) Version() *Manager {
return NewManager(c.lg, c.members[0])
}
func (c *clusterMock) MembersVersions() map[string]*version.Versions {
result := map[string]*version.Versions{}
for i, m := range c.members {
result[fmt.Sprintf("%d", i)] = &version.Versions{
Server: m.serverVersion.String(),
Cluster: c.clusterVersion.String(),
}
}
return result
}
func (c *clusterMock) ReplaceMemberBinary(mid int, newServerVersion semver.Version) {
MustDetectDowngrade(c.lg, &c.members[mid].serverVersion, &c.clusterVersion, c.downgradeInfo)
c.members[mid].serverVersion = newServerVersion
}
type memberMock struct {
cluster *clusterMock
isLeader bool
serverVersion semver.Version
storageVersion semver.Version
monitor *Monitor
}
var _ Server = (*memberMock)(nil)
func (m *memberMock) UpdateClusterVersion(version string) {
m.cluster.clusterVersion = *semver.New(version)
}
func (m *memberMock) LinearizableReadNotify(ctx context.Context) error {
return nil
}
func (m *memberMock) DowngradeEnable(ctx context.Context, targetVersion *semver.Version) error {
m.cluster.downgradeInfo = &DowngradeInfo{
TargetVersion: targetVersion.String(),
Enabled: true,
}
return nil
}
func (m *memberMock) DowngradeCancel(context.Context) error {
m.cluster.downgradeInfo = nil
return nil
}
func (m *memberMock) GetClusterVersion() *semver.Version {
return &m.cluster.clusterVersion
}
func (m *memberMock) GetDowngradeInfo() *DowngradeInfo {
return m.cluster.downgradeInfo
}
func (m *memberMock) GetMembersVersions() map[string]*version.Versions {
return m.cluster.MembersVersions()
}
func (m *memberMock) GetStorageVersion() *semver.Version {
return &m.storageVersion
}
func (m *memberMock) UpdateStorageVersion(v semver.Version) {
m.storageVersion = v
}
func (m *memberMock) TriggerSnapshot() {
}
func (m *memberMock) Lock() {
}
func (m *memberMock) Unlock() {
}

View File

@ -20,6 +20,7 @@ import (
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/version"
"go.etcd.io/etcd/server/v3/storage/backend"
"github.com/coreos/go-semver/semver"
@ -152,7 +153,7 @@ func (s *membershipBackend) MustSaveClusterVersionToBackend(ver *semver.Version)
// MustSaveDowngradeToBackend saves downgrade info to backend.
// The field is populated since etcd v3.5.
func (s *membershipBackend) MustSaveDowngradeToBackend(downgrade *membership.DowngradeInfo) {
func (s *membershipBackend) MustSaveDowngradeToBackend(downgrade *version.DowngradeInfo) {
dkey := ClusterDowngradeKeyName
dvalue, err := json.Marshal(downgrade)
if err != nil {
@ -203,7 +204,7 @@ func (s *membershipBackend) ClusterVersionFromBackend() *semver.Version {
// DowngradeInfoFromBackend reads downgrade info from backend.
// The field is populated since etcd v3.5.
func (s *membershipBackend) DowngradeInfoFromBackend() *membership.DowngradeInfo {
func (s *membershipBackend) DowngradeInfoFromBackend() *version.DowngradeInfo {
dkey := ClusterDowngradeKeyName
tx := s.be.ReadTx()
tx.Lock()
@ -219,7 +220,7 @@ func (s *membershipBackend) DowngradeInfoFromBackend() *membership.DowngradeInfo
zap.Int("number-of-key", len(keys)),
)
}
var d membership.DowngradeInfo
var d version.DowngradeInfo
if err := json.Unmarshal(vals[0], &d); err != nil {
s.lg.Panic("failed to unmarshal downgrade information", zap.Error(err))
}

View File

@ -28,6 +28,7 @@ import (
)
var (
V3_4 = semver.Version{Major: 3, Minor: 4}
V3_7 = semver.Version{Major: 3, Minor: 7}
)

View File

@ -12,13 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package etcdserver
package storage
import (
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
"go.etcd.io/etcd/server/v3/storage/wal"
"go.etcd.io/etcd/server/v3/storage/wal/walpb"
"go.uber.org/zap"
)
type Storage interface {
@ -36,12 +37,13 @@ type Storage interface {
}
type storage struct {
*wal.WAL
*snap.Snapshotter
lg *zap.Logger
s *snap.Snapshotter
w *wal.WAL
}
func NewStorage(w *wal.WAL, s *snap.Snapshotter) Storage {
return &storage{w, s}
func NewStorage(lg *zap.Logger, w *wal.WAL, s *snap.Snapshotter) Storage {
return &storage{lg: lg, w: w, s: s}
}
// SaveSnap saves the snapshot file to disk and writes the WAL snapshot entry.
@ -54,21 +56,33 @@ func (st *storage) SaveSnap(snap raftpb.Snapshot) error {
// save the snapshot file before writing the snapshot to the wal.
// This makes it possible for the snapshot file to become orphaned, but prevents
// a WAL snapshot entry from having no corresponding snapshot file.
err := st.Snapshotter.SaveSnap(snap)
err := st.s.SaveSnap(snap)
if err != nil {
return err
}
// gofail: var raftBeforeWALSaveSnaphot struct{}
return st.WAL.SaveSnapshot(walsnap)
return st.w.SaveSnapshot(walsnap)
}
// Release releases resources older than the given snap and are no longer needed:
// - releases the locks to the wal files that are older than the provided wal for the given snap.
// - deletes any .snap.db files that are older than the given snap.
func (st *storage) Release(snap raftpb.Snapshot) error {
if err := st.WAL.ReleaseLockTo(snap.Metadata.Index); err != nil {
if err := st.w.ReleaseLockTo(snap.Metadata.Index); err != nil {
return err
}
return st.Snapshotter.ReleaseSnapDBs(snap)
return st.s.ReleaseSnapDBs(snap)
}
func (st *storage) Save(s raftpb.HardState, ents []raftpb.Entry) error {
return st.w.Save(s, ents)
}
func (st *storage) Close() error {
return st.w.Close()
}
func (st *storage) Sync() error {
return st.w.Sync()
}

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package schema
package wal
import (
"fmt"
@ -26,14 +26,13 @@ import (
"go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/pkg/v3/pbutil"
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/storage/wal"
)
// MinimalStorageVersionFromWAL returns minimal etcd storage able to interpret provided WAL log,
// MinimalEtcdVersion returns minimal etcd able to interpret provided WAL log,
// determined by looking at entries since the last snapshot and returning the highest
// etcd version annotation from used messages, fields, enums and their values.
func MinimalStorageVersionFromWAL(wal *wal.WAL) *semver.Version {
_, _, ents, err := wal.ReadAll()
func (w *WAL) MinimalEtcdVersion() *semver.Version {
_, _, ents, err := w.ReadAll()
if err != nil {
panic(err)
}

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package schema
package wal
import (
"fmt"
@ -21,7 +21,6 @@ import (
"github.com/coreos/go-semver/semver"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/membershippb"
"go.etcd.io/etcd/pkg/v3/pbutil"
@ -33,6 +32,8 @@ var (
V3_1 = semver.Version{Major: 3, Minor: 1}
V3_3 = semver.Version{Major: 3, Minor: 3}
V3_4 = semver.Version{Major: 3, Minor: 4}
V3_5 = semver.Version{Major: 3, Minor: 5}
V3_6 = semver.Version{Major: 3, Minor: 6}
)
func TestEtcdVersionFromEntry(t *testing.T) {

View File

@ -26,7 +26,6 @@ import (
"go.etcd.io/etcd/client/pkg/v3/testutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/embed"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.etcd.io/etcd/server/v3/storage/wal"
"go.etcd.io/etcd/server/v3/storage/wal/walpb"
)
@ -59,11 +58,11 @@ func TestEtcdVersionFromWAL(t *testing.T) {
cli.Close()
srv.Close()
wal, err := wal.Open(zap.NewNop(), cfg.Dir+"/member/wal", walpb.Snapshot{})
w, err := wal.Open(zap.NewNop(), cfg.Dir+"/member/wal", walpb.Snapshot{})
if err != nil {
panic(err)
}
defer wal.Close()
ver := schema.MinimalStorageVersionFromWAL(wal)
defer w.Close()
ver := w.MinimalEtcdVersion()
assert.Equal(t, &semver.Version{Major: 3, Minor: 5}, ver)
}