diff --git a/server/etcdserver/adapters.go b/server/etcdserver/adapters.go new file mode 100644 index 000000000..4e97c430c --- /dev/null +++ b/server/etcdserver/adapters.go @@ -0,0 +1,44 @@ +package etcdserver + +import ( + "context" + + "github.com/coreos/go-semver/semver" + "go.uber.org/zap" + + "go.etcd.io/etcd/api/v3/version" + "go.etcd.io/etcd/server/v3/etcdserver/api/membership" + serverversion "go.etcd.io/etcd/server/v3/etcdserver/version" +) + +// serverVersionAdapter implements Server interface needed by serverversion.Monitor +type serverVersionAdapter struct { + *EtcdServer +} + +var _ serverversion.Server = (*serverVersionAdapter)(nil) + +func (s *serverVersionAdapter) UpdateClusterVersion(version string) { + // TODO switch to updateClusterVersionV3 in 3.6 + s.GoAttach(func() { s.updateClusterVersionV2(version) }) +} + +func (s *serverVersionAdapter) DowngradeCancel() { + ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout()) + if _, err := s.downgradeCancel(ctx); err != nil { + s.lg.Warn("failed to cancel downgrade", zap.Error(err)) + } + cancel() +} + +func (s *serverVersionAdapter) GetClusterVersion() *semver.Version { + return s.cluster.Version() +} + +func (s *serverVersionAdapter) GetDowngradeInfo() *membership.DowngradeInfo { + return s.cluster.DowngradeInfo() +} + +func (s *serverVersionAdapter) GetVersions() map[string]*version.Versions { + return getVersions(s.lg, s.cluster, s.id, s.peerRt) +} diff --git a/server/etcdserver/cluster_util.go b/server/etcdserver/cluster_util.go index 595586e20..61028a898 100644 --- a/server/etcdserver/cluster_util.go +++ b/server/etcdserver/cluster_util.go @@ -161,44 +161,6 @@ func getVersions(lg *zap.Logger, cl *membership.RaftCluster, local types.ID, rt return vers } -// decideClusterVersion decides the cluster version based on the versions map. -// The returned version is the min server version in the map, or nil if the min -// version in unknown. -func decideClusterVersion(lg *zap.Logger, vers map[string]*version.Versions) *semver.Version { - var cv *semver.Version - lv := semver.Must(semver.NewVersion(version.Version)) - - for mid, ver := range vers { - if ver == nil { - return nil - } - v, err := semver.NewVersion(ver.Server) - if err != nil { - lg.Warn( - "failed to parse server version of remote member", - zap.String("remote-member-id", mid), - zap.String("remote-member-version", ver.Server), - zap.Error(err), - ) - return nil - } - if lv.LessThan(*v) { - lg.Warn( - "leader found higher-versioned member", - zap.String("local-member-version", lv.String()), - zap.String("remote-member-id", mid), - zap.String("remote-member-version", ver.Server), - ) - } - if cv == nil { - cv = v - } else if v.LessThan(*cv) { - cv = v - } - } - return cv -} - // allowedVersionRange decides the available version range of the cluster that local server can join in; // if the downgrade enabled status is true, the version window is [oneMinorHigher, oneMinorHigher] // if the downgrade is not enabled, the version window is [MinClusterVersion, localVersion] @@ -438,35 +400,6 @@ func getDowngradeEnabled(lg *zap.Logger, m *membership.Member, rt http.RoundTrip return false, err } -// isMatchedVersions returns true if all server versions are equal to target version, otherwise return false. -// It can be used to decide the whether the cluster finishes downgrading to target version. -func isMatchedVersions(lg *zap.Logger, targetVersion *semver.Version, vers map[string]*version.Versions) bool { - for mid, ver := range vers { - if ver == nil { - return false - } - v, err := semver.NewVersion(ver.Cluster) - if err != nil { - lg.Warn( - "failed to parse server version of remote member", - zap.String("remote-member-id", mid), - zap.String("remote-member-version", ver.Server), - zap.Error(err), - ) - return false - } - if !targetVersion.Equal(*v) { - lg.Warn("remotes server has mismatching etcd version", - zap.String("remote-member-id", mid), - zap.String("current-server-version", v.String()), - zap.String("target-version", targetVersion.String()), - ) - return false - } - } - return true -} - func convertToClusterVersion(v string) (*semver.Version, error) { ver, err := semver.NewVersion(v) if err != nil { diff --git a/server/etcdserver/cluster_util_test.go b/server/etcdserver/cluster_util_test.go index f2196b84d..0397634d9 100644 --- a/server/etcdserver/cluster_util_test.go +++ b/server/etcdserver/cluster_util_test.go @@ -15,7 +15,6 @@ package etcdserver import ( - "reflect" "testing" "go.etcd.io/etcd/api/v3/version" @@ -27,42 +26,6 @@ import ( var testLogger = zap.NewExample() -func TestDecideClusterVersion(t *testing.T) { - tests := []struct { - vers map[string]*version.Versions - wdver *semver.Version - }{ - { - map[string]*version.Versions{"a": {Server: "2.0.0"}}, - semver.Must(semver.NewVersion("2.0.0")), - }, - // unknown - { - map[string]*version.Versions{"a": nil}, - nil, - }, - { - map[string]*version.Versions{"a": {Server: "2.0.0"}, "b": {Server: "2.1.0"}, "c": {Server: "2.1.0"}}, - semver.Must(semver.NewVersion("2.0.0")), - }, - { - map[string]*version.Versions{"a": {Server: "2.1.0"}, "b": {Server: "2.1.0"}, "c": {Server: "2.1.0"}}, - semver.Must(semver.NewVersion("2.1.0")), - }, - { - map[string]*version.Versions{"a": nil, "b": {Server: "2.1.0"}, "c": {Server: "2.1.0"}}, - nil, - }, - } - - for i, tt := range tests { - dver := decideClusterVersion(testLogger, tt.vers) - if !reflect.DeepEqual(dver, tt.wdver) { - t.Errorf("#%d: ver = %+v, want %+v", i, dver, tt.wdver) - } - } -} - func TestIsCompatibleWithVers(t *testing.T) { tests := []struct { vers map[string]*version.Versions @@ -215,52 +178,3 @@ func TestDecideAllowedVersionRange(t *testing.T) { }) } } - -func TestIsMatchedVersions(t *testing.T) { - tests := []struct { - name string - targetVersion *semver.Version - versionMap map[string]*version.Versions - expectedFinished bool - }{ - { - "When downgrade finished", - &semver.Version{Major: 3, Minor: 4}, - map[string]*version.Versions{ - "mem1": {Server: "3.4.1", Cluster: "3.4.0"}, - "mem2": {Server: "3.4.2-pre", Cluster: "3.4.0"}, - "mem3": {Server: "3.4.2", Cluster: "3.4.0"}, - }, - true, - }, - { - "When cannot parse peer version", - &semver.Version{Major: 3, Minor: 4}, - map[string]*version.Versions{ - "mem1": {Server: "3.4.1", Cluster: "3.4"}, - "mem2": {Server: "3.4.2-pre", Cluster: "3.4.0"}, - "mem3": {Server: "3.4.2", Cluster: "3.4.0"}, - }, - false, - }, - { - "When downgrade not finished", - &semver.Version{Major: 3, Minor: 4}, - map[string]*version.Versions{ - "mem1": {Server: "3.4.1", Cluster: "3.4.0"}, - "mem2": {Server: "3.4.2-pre", Cluster: "3.4.0"}, - "mem3": {Server: "3.5.2", Cluster: "3.5.0"}, - }, - false, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - actual := isMatchedVersions(zap.NewNop(), tt.targetVersion, tt.versionMap) - if actual != tt.expectedFinished { - t.Errorf("expected downgrade finished is %v; got %v", tt.expectedFinished, actual) - } - }) - } -} diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 89fd99185..6058c14de 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -62,6 +62,7 @@ import ( "go.etcd.io/etcd/server/v3/etcdserver/api/v3alarm" "go.etcd.io/etcd/server/v3/etcdserver/api/v3compactor" "go.etcd.io/etcd/server/v3/etcdserver/cindex" + serverversion "go.etcd.io/etcd/server/v3/etcdserver/version" "go.etcd.io/etcd/server/v3/lease" "go.etcd.io/etcd/server/v3/lease/leasehttp" "go.etcd.io/etcd/server/v3/mvcc" @@ -2430,12 +2431,9 @@ func (s *EtcdServer) ClusterVersion() *semver.Version { return s.cluster.Version() } -// monitorVersions checks the member's version every monitorVersionInterval. -// It updates the cluster version if all members agrees on a higher one. -// It prints out log if there is a member with a higher version than the -// local version. -// TODO switch to updateClusterVersionV3 in 3.6 +// monitorVersions every monitorVersionInterval checks if it's the leader and updates cluster version if needed. func (s *EtcdServer) monitorVersions() { + monitor := serverversion.NewMonitor(s.Logger(), &serverVersionAdapter{s}) for { select { case <-s.FirstCommitInTermNotify(): @@ -2447,31 +2445,7 @@ func (s *EtcdServer) monitorVersions() { if s.Leader() != s.ID() { continue } - - v := decideClusterVersion(s.Logger(), getVersions(s.Logger(), s.cluster, s.id, s.peerRt)) - if v != nil { - // only keep major.minor version for comparison - v = &semver.Version{ - Major: v.Major, - Minor: v.Minor, - } - } - - // if the current version is nil: - // 1. use the decided version if possible - // 2. or use the min cluster version - if s.cluster.Version() == nil { - verStr := version.MinClusterVersion - if v != nil { - verStr = v.String() - } - s.GoAttach(func() { s.updateClusterVersionV2(verStr) }) - continue - } - - if v != nil && membership.IsValidVersionChange(s.cluster.Version(), v) { - s.GoAttach(func() { s.updateClusterVersionV2(v.String()) }) - } + monitor.UpdateClusterVersionIfNeeded() } } @@ -2551,12 +2525,13 @@ func (s *EtcdServer) updateClusterVersionV3(ver string) { } } +// monitorDowngrade every DowngradeCheckTime checks if it's the leader and cancels downgrade if needed. func (s *EtcdServer) monitorDowngrade() { + monitor := serverversion.NewMonitor(s.Logger(), &serverVersionAdapter{s}) t := s.Cfg.DowngradeCheckTime if t == 0 { return } - lg := s.Logger() for { select { case <-time.After(t): @@ -2567,22 +2542,7 @@ func (s *EtcdServer) monitorDowngrade() { if !s.isLeader() { continue } - - d := s.cluster.DowngradeInfo() - if !d.Enabled { - continue - } - - targetVersion := d.TargetVersion - v := semver.Must(semver.NewVersion(targetVersion)) - if isMatchedVersions(s.Logger(), v, getVersions(s.Logger(), s.cluster, s.id, s.peerRt)) { - lg.Info("the cluster has been downgraded", zap.String("cluster-version", targetVersion)) - ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout()) - if _, err := s.downgradeCancel(ctx); err != nil { - lg.Warn("failed to cancel downgrade", zap.Error(err)) - } - cancel() - } + monitor.CancelDowngradeIfNeeded() } } diff --git a/server/etcdserver/version/monitor.go b/server/etcdserver/version/monitor.go new file mode 100644 index 000000000..d53b33d4c --- /dev/null +++ b/server/etcdserver/version/monitor.go @@ -0,0 +1,143 @@ +package version + +import ( + "github.com/coreos/go-semver/semver" + "go.etcd.io/etcd/api/v3/version" + "go.etcd.io/etcd/server/v3/etcdserver/api/membership" + "go.uber.org/zap" +) + +// Monitor contains logic used by cluster leader to monitor version changes and decide on cluster version or downgrade progress. +type Monitor struct { + lg *zap.Logger + s Server +} + +// Server lists EtcdServer methods needed by Monitor +type Server interface { + GetClusterVersion() *semver.Version + GetDowngradeInfo() *membership.DowngradeInfo + GetVersions() map[string]*version.Versions + UpdateClusterVersion(string) + DowngradeCancel() +} + +func NewMonitor(lg *zap.Logger, storage Server) *Monitor { + return &Monitor{ + lg: lg, + s: storage, + } +} + +// UpdateClusterVersionIfNeeded updates the cluster version if all members agrees on a higher one. +// It prints out log if there is a member with a higher version than the +// local version. +func (m *Monitor) UpdateClusterVersionIfNeeded() { + v := m.decideClusterVersion() + if v != nil { + // only keep major.minor version for comparison + v = &semver.Version{ + Major: v.Major, + Minor: v.Minor, + } + } + + // if the current version is nil: + // 1. use the decided version if possible + // 2. or use the min cluster version + if m.s.GetClusterVersion() == nil { + verStr := version.MinClusterVersion + if v != nil { + verStr = v.String() + } + m.s.UpdateClusterVersion(verStr) + return + } + + if v != nil && membership.IsValidVersionChange(m.s.GetClusterVersion(), v) { + m.s.UpdateClusterVersion(v.String()) + } +} + +func (m *Monitor) CancelDowngradeIfNeeded() { + d := m.s.GetDowngradeInfo() + if !d.Enabled { + return + } + + targetVersion := d.TargetVersion + v := semver.Must(semver.NewVersion(targetVersion)) + if m.versionsMatchTarget(v) { + m.lg.Info("the cluster has been downgraded", zap.String("cluster-version", targetVersion)) + m.s.DowngradeCancel() + } +} + +// decideClusterVersion decides the cluster version based on the versions map. +// The returned version is the min server version in the map, or nil if the min +// version in unknown. +func (m *Monitor) decideClusterVersion() *semver.Version { + vers := m.s.GetVersions() + var cv *semver.Version + lv := semver.Must(semver.NewVersion(version.Version)) + + for mid, ver := range vers { + if ver == nil { + return nil + } + v, err := semver.NewVersion(ver.Server) + if err != nil { + m.lg.Warn( + "failed to parse server version of remote member", + zap.String("remote-member-id", mid), + zap.String("remote-member-version", ver.Server), + zap.Error(err), + ) + return nil + } + if lv.LessThan(*v) { + m.lg.Warn( + "leader found higher-versioned member", + zap.String("local-member-version", lv.String()), + zap.String("remote-member-id", mid), + zap.String("remote-member-version", ver.Server), + ) + } + if cv == nil { + cv = v + } else if v.LessThan(*cv) { + cv = v + } + } + return cv +} + +// versionsMatchTarget returns true if all server versions are equal to target version, otherwise return false. +// It can be used to decide the whether the cluster finishes downgrading to target version. +func (m *Monitor) versionsMatchTarget(targetVersion *semver.Version) bool { + vers := m.s.GetVersions() + for mid, ver := range vers { + if ver == nil { + return false + } + v, err := semver.NewVersion(ver.Cluster) + if err != nil { + m.lg.Warn( + "failed to parse server version of remote member", + zap.String("remote-member-id", mid), + zap.String("remote-member-version", ver.Server), + zap.Error(err), + ) + return false + } + if !targetVersion.Equal(*v) { + m.lg.Warn("remotes server has mismatching etcd version", + zap.String("remote-member-id", mid), + zap.String("current-server-version", v.String()), + zap.String("target-version", targetVersion.String()), + ) + return false + } + } + return true +} diff --git a/server/etcdserver/version/monitor_test.go b/server/etcdserver/version/monitor_test.go new file mode 100644 index 000000000..1a712ef08 --- /dev/null +++ b/server/etcdserver/version/monitor_test.go @@ -0,0 +1,133 @@ +package version + +import ( + "reflect" + "testing" + + "github.com/coreos/go-semver/semver" + "go.uber.org/zap" + + "go.etcd.io/etcd/api/v3/version" + "go.etcd.io/etcd/server/v3/etcdserver/api/membership" +) + +var testLogger = zap.NewExample() + +func TestDecideClusterVersion(t *testing.T) { + tests := []struct { + vers map[string]*version.Versions + wdver *semver.Version + }{ + { + map[string]*version.Versions{"a": {Server: "2.0.0"}}, + semver.Must(semver.NewVersion("2.0.0")), + }, + // unknown + { + map[string]*version.Versions{"a": nil}, + nil, + }, + { + map[string]*version.Versions{"a": {Server: "2.0.0"}, "b": {Server: "2.1.0"}, "c": {Server: "2.1.0"}}, + semver.Must(semver.NewVersion("2.0.0")), + }, + { + map[string]*version.Versions{"a": {Server: "2.1.0"}, "b": {Server: "2.1.0"}, "c": {Server: "2.1.0"}}, + semver.Must(semver.NewVersion("2.1.0")), + }, + { + map[string]*version.Versions{"a": nil, "b": {Server: "2.1.0"}, "c": {Server: "2.1.0"}}, + nil, + }, + } + + for i, tt := range tests { + monitor := NewMonitor(testLogger, &storageMock{ + versions: tt.vers, + }) + dver := monitor.decideClusterVersion() + if !reflect.DeepEqual(dver, tt.wdver) { + t.Errorf("#%d: ver = %+v, want %+v", i, dver, tt.wdver) + } + } +} + +func TestVersionMatchTarget(t *testing.T) { + tests := []struct { + name string + targetVersion *semver.Version + versionMap map[string]*version.Versions + expectedFinished bool + }{ + { + "When downgrade finished", + &semver.Version{Major: 3, Minor: 4}, + map[string]*version.Versions{ + "mem1": {Server: "3.4.1", Cluster: "3.4.0"}, + "mem2": {Server: "3.4.2-pre", Cluster: "3.4.0"}, + "mem3": {Server: "3.4.2", Cluster: "3.4.0"}, + }, + true, + }, + { + "When cannot parse peer version", + &semver.Version{Major: 3, Minor: 4}, + map[string]*version.Versions{ + "mem1": {Server: "3.4.1", Cluster: "3.4"}, + "mem2": {Server: "3.4.2-pre", Cluster: "3.4.0"}, + "mem3": {Server: "3.4.2", Cluster: "3.4.0"}, + }, + false, + }, + { + "When downgrade not finished", + &semver.Version{Major: 3, Minor: 4}, + map[string]*version.Versions{ + "mem1": {Server: "3.4.1", Cluster: "3.4.0"}, + "mem2": {Server: "3.4.2-pre", Cluster: "3.4.0"}, + "mem3": {Server: "3.5.2", Cluster: "3.5.0"}, + }, + false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + monitor := NewMonitor(testLogger, &storageMock{ + versions: tt.versionMap, + }) + actual := monitor.versionsMatchTarget(tt.targetVersion) + if actual != tt.expectedFinished { + t.Errorf("expected downgrade finished is %v; got %v", tt.expectedFinished, actual) + } + }) + } +} + +type storageMock struct { + versions map[string]*version.Versions + clusterVersion *semver.Version + downgradeInfo *membership.DowngradeInfo +} + +var _ Server = (*storageMock)(nil) + +func (s *storageMock) UpdateClusterVersion(version string) { + s.clusterVersion = semver.New(version) +} + +func (s *storageMock) DowngradeCancel() { + s.downgradeInfo = nil +} + +func (s *storageMock) GetClusterVersion() *semver.Version { + return s.clusterVersion +} + +func (s *storageMock) GetDowngradeInfo() *membership.DowngradeInfo { + return s.downgradeInfo +} + +func (s *storageMock) GetVersions() map[string]*version.Versions { + return s.versions +}