Merge pull request #13132 from serathius/refactor-monitor

etcdserver: Move version monitor logic to separate module
This commit is contained in:
Piotr Tabor
2021-06-24 10:40:01 +02:00
committed by GitHub
6 changed files with 327 additions and 200 deletions

View File

@@ -0,0 +1,44 @@
package etcdserver
import (
"context"
"github.com/coreos/go-semver/semver"
"go.uber.org/zap"
"go.etcd.io/etcd/api/v3/version"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
serverversion "go.etcd.io/etcd/server/v3/etcdserver/version"
)
// serverVersionAdapter implements Server interface needed by serverversion.Monitor
type serverVersionAdapter struct {
*EtcdServer
}
var _ serverversion.Server = (*serverVersionAdapter)(nil)
func (s *serverVersionAdapter) UpdateClusterVersion(version string) {
// TODO switch to updateClusterVersionV3 in 3.6
s.GoAttach(func() { s.updateClusterVersionV2(version) })
}
func (s *serverVersionAdapter) DowngradeCancel() {
ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
if _, err := s.downgradeCancel(ctx); err != nil {
s.lg.Warn("failed to cancel downgrade", zap.Error(err))
}
cancel()
}
func (s *serverVersionAdapter) GetClusterVersion() *semver.Version {
return s.cluster.Version()
}
func (s *serverVersionAdapter) GetDowngradeInfo() *membership.DowngradeInfo {
return s.cluster.DowngradeInfo()
}
func (s *serverVersionAdapter) GetVersions() map[string]*version.Versions {
return getVersions(s.lg, s.cluster, s.id, s.peerRt)
}

View File

@@ -161,44 +161,6 @@ func getVersions(lg *zap.Logger, cl *membership.RaftCluster, local types.ID, rt
return vers
}
// decideClusterVersion decides the cluster version based on the versions map.
// The returned version is the min server version in the map, or nil if the min
// version in unknown.
func decideClusterVersion(lg *zap.Logger, vers map[string]*version.Versions) *semver.Version {
var cv *semver.Version
lv := semver.Must(semver.NewVersion(version.Version))
for mid, ver := range vers {
if ver == nil {
return nil
}
v, err := semver.NewVersion(ver.Server)
if err != nil {
lg.Warn(
"failed to parse server version of remote member",
zap.String("remote-member-id", mid),
zap.String("remote-member-version", ver.Server),
zap.Error(err),
)
return nil
}
if lv.LessThan(*v) {
lg.Warn(
"leader found higher-versioned member",
zap.String("local-member-version", lv.String()),
zap.String("remote-member-id", mid),
zap.String("remote-member-version", ver.Server),
)
}
if cv == nil {
cv = v
} else if v.LessThan(*cv) {
cv = v
}
}
return cv
}
// allowedVersionRange decides the available version range of the cluster that local server can join in;
// if the downgrade enabled status is true, the version window is [oneMinorHigher, oneMinorHigher]
// if the downgrade is not enabled, the version window is [MinClusterVersion, localVersion]
@@ -438,35 +400,6 @@ func getDowngradeEnabled(lg *zap.Logger, m *membership.Member, rt http.RoundTrip
return false, err
}
// isMatchedVersions returns true if all server versions are equal to target version, otherwise return false.
// It can be used to decide the whether the cluster finishes downgrading to target version.
func isMatchedVersions(lg *zap.Logger, targetVersion *semver.Version, vers map[string]*version.Versions) bool {
for mid, ver := range vers {
if ver == nil {
return false
}
v, err := semver.NewVersion(ver.Cluster)
if err != nil {
lg.Warn(
"failed to parse server version of remote member",
zap.String("remote-member-id", mid),
zap.String("remote-member-version", ver.Server),
zap.Error(err),
)
return false
}
if !targetVersion.Equal(*v) {
lg.Warn("remotes server has mismatching etcd version",
zap.String("remote-member-id", mid),
zap.String("current-server-version", v.String()),
zap.String("target-version", targetVersion.String()),
)
return false
}
}
return true
}
func convertToClusterVersion(v string) (*semver.Version, error) {
ver, err := semver.NewVersion(v)
if err != nil {

View File

@@ -15,7 +15,6 @@
package etcdserver
import (
"reflect"
"testing"
"go.etcd.io/etcd/api/v3/version"
@@ -27,42 +26,6 @@ import (
var testLogger = zap.NewExample()
func TestDecideClusterVersion(t *testing.T) {
tests := []struct {
vers map[string]*version.Versions
wdver *semver.Version
}{
{
map[string]*version.Versions{"a": {Server: "2.0.0"}},
semver.Must(semver.NewVersion("2.0.0")),
},
// unknown
{
map[string]*version.Versions{"a": nil},
nil,
},
{
map[string]*version.Versions{"a": {Server: "2.0.0"}, "b": {Server: "2.1.0"}, "c": {Server: "2.1.0"}},
semver.Must(semver.NewVersion("2.0.0")),
},
{
map[string]*version.Versions{"a": {Server: "2.1.0"}, "b": {Server: "2.1.0"}, "c": {Server: "2.1.0"}},
semver.Must(semver.NewVersion("2.1.0")),
},
{
map[string]*version.Versions{"a": nil, "b": {Server: "2.1.0"}, "c": {Server: "2.1.0"}},
nil,
},
}
for i, tt := range tests {
dver := decideClusterVersion(testLogger, tt.vers)
if !reflect.DeepEqual(dver, tt.wdver) {
t.Errorf("#%d: ver = %+v, want %+v", i, dver, tt.wdver)
}
}
}
func TestIsCompatibleWithVers(t *testing.T) {
tests := []struct {
vers map[string]*version.Versions
@@ -215,52 +178,3 @@ func TestDecideAllowedVersionRange(t *testing.T) {
})
}
}
func TestIsMatchedVersions(t *testing.T) {
tests := []struct {
name string
targetVersion *semver.Version
versionMap map[string]*version.Versions
expectedFinished bool
}{
{
"When downgrade finished",
&semver.Version{Major: 3, Minor: 4},
map[string]*version.Versions{
"mem1": {Server: "3.4.1", Cluster: "3.4.0"},
"mem2": {Server: "3.4.2-pre", Cluster: "3.4.0"},
"mem3": {Server: "3.4.2", Cluster: "3.4.0"},
},
true,
},
{
"When cannot parse peer version",
&semver.Version{Major: 3, Minor: 4},
map[string]*version.Versions{
"mem1": {Server: "3.4.1", Cluster: "3.4"},
"mem2": {Server: "3.4.2-pre", Cluster: "3.4.0"},
"mem3": {Server: "3.4.2", Cluster: "3.4.0"},
},
false,
},
{
"When downgrade not finished",
&semver.Version{Major: 3, Minor: 4},
map[string]*version.Versions{
"mem1": {Server: "3.4.1", Cluster: "3.4.0"},
"mem2": {Server: "3.4.2-pre", Cluster: "3.4.0"},
"mem3": {Server: "3.5.2", Cluster: "3.5.0"},
},
false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
actual := isMatchedVersions(zap.NewNop(), tt.targetVersion, tt.versionMap)
if actual != tt.expectedFinished {
t.Errorf("expected downgrade finished is %v; got %v", tt.expectedFinished, actual)
}
})
}
}

View File

@@ -62,6 +62,7 @@ import (
"go.etcd.io/etcd/server/v3/etcdserver/api/v3alarm"
"go.etcd.io/etcd/server/v3/etcdserver/api/v3compactor"
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
serverversion "go.etcd.io/etcd/server/v3/etcdserver/version"
"go.etcd.io/etcd/server/v3/lease"
"go.etcd.io/etcd/server/v3/lease/leasehttp"
"go.etcd.io/etcd/server/v3/mvcc"
@@ -2430,12 +2431,9 @@ func (s *EtcdServer) ClusterVersion() *semver.Version {
return s.cluster.Version()
}
// monitorVersions checks the member's version every monitorVersionInterval.
// It updates the cluster version if all members agrees on a higher one.
// It prints out log if there is a member with a higher version than the
// local version.
// TODO switch to updateClusterVersionV3 in 3.6
// monitorVersions every monitorVersionInterval checks if it's the leader and updates cluster version if needed.
func (s *EtcdServer) monitorVersions() {
monitor := serverversion.NewMonitor(s.Logger(), &serverVersionAdapter{s})
for {
select {
case <-s.FirstCommitInTermNotify():
@@ -2447,31 +2445,7 @@ func (s *EtcdServer) monitorVersions() {
if s.Leader() != s.ID() {
continue
}
v := decideClusterVersion(s.Logger(), getVersions(s.Logger(), s.cluster, s.id, s.peerRt))
if v != nil {
// only keep major.minor version for comparison
v = &semver.Version{
Major: v.Major,
Minor: v.Minor,
}
}
// if the current version is nil:
// 1. use the decided version if possible
// 2. or use the min cluster version
if s.cluster.Version() == nil {
verStr := version.MinClusterVersion
if v != nil {
verStr = v.String()
}
s.GoAttach(func() { s.updateClusterVersionV2(verStr) })
continue
}
if v != nil && membership.IsValidVersionChange(s.cluster.Version(), v) {
s.GoAttach(func() { s.updateClusterVersionV2(v.String()) })
}
monitor.UpdateClusterVersionIfNeeded()
}
}
@@ -2551,12 +2525,13 @@ func (s *EtcdServer) updateClusterVersionV3(ver string) {
}
}
// monitorDowngrade every DowngradeCheckTime checks if it's the leader and cancels downgrade if needed.
func (s *EtcdServer) monitorDowngrade() {
monitor := serverversion.NewMonitor(s.Logger(), &serverVersionAdapter{s})
t := s.Cfg.DowngradeCheckTime
if t == 0 {
return
}
lg := s.Logger()
for {
select {
case <-time.After(t):
@@ -2567,22 +2542,7 @@ func (s *EtcdServer) monitorDowngrade() {
if !s.isLeader() {
continue
}
d := s.cluster.DowngradeInfo()
if !d.Enabled {
continue
}
targetVersion := d.TargetVersion
v := semver.Must(semver.NewVersion(targetVersion))
if isMatchedVersions(s.Logger(), v, getVersions(s.Logger(), s.cluster, s.id, s.peerRt)) {
lg.Info("the cluster has been downgraded", zap.String("cluster-version", targetVersion))
ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
if _, err := s.downgradeCancel(ctx); err != nil {
lg.Warn("failed to cancel downgrade", zap.Error(err))
}
cancel()
}
monitor.CancelDowngradeIfNeeded()
}
}

View File

@@ -0,0 +1,143 @@
package version
import (
"github.com/coreos/go-semver/semver"
"go.etcd.io/etcd/api/v3/version"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.uber.org/zap"
)
// Monitor contains logic used by cluster leader to monitor version changes and decide on cluster version or downgrade progress.
type Monitor struct {
lg *zap.Logger
s Server
}
// Server lists EtcdServer methods needed by Monitor
type Server interface {
GetClusterVersion() *semver.Version
GetDowngradeInfo() *membership.DowngradeInfo
GetVersions() map[string]*version.Versions
UpdateClusterVersion(string)
DowngradeCancel()
}
func NewMonitor(lg *zap.Logger, storage Server) *Monitor {
return &Monitor{
lg: lg,
s: storage,
}
}
// UpdateClusterVersionIfNeeded updates the cluster version if all members agrees on a higher one.
// It prints out log if there is a member with a higher version than the
// local version.
func (m *Monitor) UpdateClusterVersionIfNeeded() {
v := m.decideClusterVersion()
if v != nil {
// only keep major.minor version for comparison
v = &semver.Version{
Major: v.Major,
Minor: v.Minor,
}
}
// if the current version is nil:
// 1. use the decided version if possible
// 2. or use the min cluster version
if m.s.GetClusterVersion() == nil {
verStr := version.MinClusterVersion
if v != nil {
verStr = v.String()
}
m.s.UpdateClusterVersion(verStr)
return
}
if v != nil && membership.IsValidVersionChange(m.s.GetClusterVersion(), v) {
m.s.UpdateClusterVersion(v.String())
}
}
func (m *Monitor) CancelDowngradeIfNeeded() {
d := m.s.GetDowngradeInfo()
if !d.Enabled {
return
}
targetVersion := d.TargetVersion
v := semver.Must(semver.NewVersion(targetVersion))
if m.versionsMatchTarget(v) {
m.lg.Info("the cluster has been downgraded", zap.String("cluster-version", targetVersion))
m.s.DowngradeCancel()
}
}
// decideClusterVersion decides the cluster version based on the versions map.
// The returned version is the min server version in the map, or nil if the min
// version in unknown.
func (m *Monitor) decideClusterVersion() *semver.Version {
vers := m.s.GetVersions()
var cv *semver.Version
lv := semver.Must(semver.NewVersion(version.Version))
for mid, ver := range vers {
if ver == nil {
return nil
}
v, err := semver.NewVersion(ver.Server)
if err != nil {
m.lg.Warn(
"failed to parse server version of remote member",
zap.String("remote-member-id", mid),
zap.String("remote-member-version", ver.Server),
zap.Error(err),
)
return nil
}
if lv.LessThan(*v) {
m.lg.Warn(
"leader found higher-versioned member",
zap.String("local-member-version", lv.String()),
zap.String("remote-member-id", mid),
zap.String("remote-member-version", ver.Server),
)
}
if cv == nil {
cv = v
} else if v.LessThan(*cv) {
cv = v
}
}
return cv
}
// versionsMatchTarget returns true if all server versions are equal to target version, otherwise return false.
// It can be used to decide the whether the cluster finishes downgrading to target version.
func (m *Monitor) versionsMatchTarget(targetVersion *semver.Version) bool {
vers := m.s.GetVersions()
for mid, ver := range vers {
if ver == nil {
return false
}
v, err := semver.NewVersion(ver.Cluster)
if err != nil {
m.lg.Warn(
"failed to parse server version of remote member",
zap.String("remote-member-id", mid),
zap.String("remote-member-version", ver.Server),
zap.Error(err),
)
return false
}
if !targetVersion.Equal(*v) {
m.lg.Warn("remotes server has mismatching etcd version",
zap.String("remote-member-id", mid),
zap.String("current-server-version", v.String()),
zap.String("target-version", targetVersion.String()),
)
return false
}
}
return true
}

View File

@@ -0,0 +1,133 @@
package version
import (
"reflect"
"testing"
"github.com/coreos/go-semver/semver"
"go.uber.org/zap"
"go.etcd.io/etcd/api/v3/version"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
)
var testLogger = zap.NewExample()
func TestDecideClusterVersion(t *testing.T) {
tests := []struct {
vers map[string]*version.Versions
wdver *semver.Version
}{
{
map[string]*version.Versions{"a": {Server: "2.0.0"}},
semver.Must(semver.NewVersion("2.0.0")),
},
// unknown
{
map[string]*version.Versions{"a": nil},
nil,
},
{
map[string]*version.Versions{"a": {Server: "2.0.0"}, "b": {Server: "2.1.0"}, "c": {Server: "2.1.0"}},
semver.Must(semver.NewVersion("2.0.0")),
},
{
map[string]*version.Versions{"a": {Server: "2.1.0"}, "b": {Server: "2.1.0"}, "c": {Server: "2.1.0"}},
semver.Must(semver.NewVersion("2.1.0")),
},
{
map[string]*version.Versions{"a": nil, "b": {Server: "2.1.0"}, "c": {Server: "2.1.0"}},
nil,
},
}
for i, tt := range tests {
monitor := NewMonitor(testLogger, &storageMock{
versions: tt.vers,
})
dver := monitor.decideClusterVersion()
if !reflect.DeepEqual(dver, tt.wdver) {
t.Errorf("#%d: ver = %+v, want %+v", i, dver, tt.wdver)
}
}
}
func TestVersionMatchTarget(t *testing.T) {
tests := []struct {
name string
targetVersion *semver.Version
versionMap map[string]*version.Versions
expectedFinished bool
}{
{
"When downgrade finished",
&semver.Version{Major: 3, Minor: 4},
map[string]*version.Versions{
"mem1": {Server: "3.4.1", Cluster: "3.4.0"},
"mem2": {Server: "3.4.2-pre", Cluster: "3.4.0"},
"mem3": {Server: "3.4.2", Cluster: "3.4.0"},
},
true,
},
{
"When cannot parse peer version",
&semver.Version{Major: 3, Minor: 4},
map[string]*version.Versions{
"mem1": {Server: "3.4.1", Cluster: "3.4"},
"mem2": {Server: "3.4.2-pre", Cluster: "3.4.0"},
"mem3": {Server: "3.4.2", Cluster: "3.4.0"},
},
false,
},
{
"When downgrade not finished",
&semver.Version{Major: 3, Minor: 4},
map[string]*version.Versions{
"mem1": {Server: "3.4.1", Cluster: "3.4.0"},
"mem2": {Server: "3.4.2-pre", Cluster: "3.4.0"},
"mem3": {Server: "3.5.2", Cluster: "3.5.0"},
},
false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
monitor := NewMonitor(testLogger, &storageMock{
versions: tt.versionMap,
})
actual := monitor.versionsMatchTarget(tt.targetVersion)
if actual != tt.expectedFinished {
t.Errorf("expected downgrade finished is %v; got %v", tt.expectedFinished, actual)
}
})
}
}
type storageMock struct {
versions map[string]*version.Versions
clusterVersion *semver.Version
downgradeInfo *membership.DowngradeInfo
}
var _ Server = (*storageMock)(nil)
func (s *storageMock) UpdateClusterVersion(version string) {
s.clusterVersion = semver.New(version)
}
func (s *storageMock) DowngradeCancel() {
s.downgradeInfo = nil
}
func (s *storageMock) GetClusterVersion() *semver.Version {
return s.clusterVersion
}
func (s *storageMock) GetDowngradeInfo() *membership.DowngradeInfo {
return s.downgradeInfo
}
func (s *storageMock) GetVersions() map[string]*version.Versions {
return s.versions
}