mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
add flag to allow downgrade from 3.5
Signed-off-by: Siyuan Zhang <sizhang@google.com>
This commit is contained in:
parent
26620387c7
commit
e7da7ebf7e
@ -277,7 +277,7 @@ func (s *v3Manager) Restore(cfg RestoreConfig) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
s.cl, err = membership.NewClusterFromURLsMap(s.lg, cfg.InitialClusterToken, ics)
|
s.cl, err = membership.NewClusterFromURLsMap(s.lg, cfg.InitialClusterToken, ics, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -363,6 +363,8 @@ type Config struct {
|
|||||||
// UnsafeNoFsync disables all uses of fsync.
|
// UnsafeNoFsync disables all uses of fsync.
|
||||||
// Setting this is unsafe and will cause data loss.
|
// Setting this is unsafe and will cause data loss.
|
||||||
UnsafeNoFsync bool `json:"unsafe-no-fsync"`
|
UnsafeNoFsync bool `json:"unsafe-no-fsync"`
|
||||||
|
// NextClusterVersionCompatible enables 3.4 to be compatible with next version 3.5, to allow 3.4 server to join 3.5 cluster and start on 3.5 schema.
|
||||||
|
NextClusterVersionCompatible bool `json:"next-cluster-version-compatible"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// configYAML holds the config suitable for yaml parsing
|
// configYAML holds the config suitable for yaml parsing
|
||||||
|
101
embed/etcd.go
101
embed/etcd.go
@ -164,56 +164,57 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
|
|||||||
backendFreelistType := parseBackendFreelistType(cfg.ExperimentalBackendFreelistType)
|
backendFreelistType := parseBackendFreelistType(cfg.ExperimentalBackendFreelistType)
|
||||||
|
|
||||||
srvcfg := etcdserver.ServerConfig{
|
srvcfg := etcdserver.ServerConfig{
|
||||||
Name: cfg.Name,
|
Name: cfg.Name,
|
||||||
ClientURLs: cfg.AdvertiseClientUrls,
|
ClientURLs: cfg.AdvertiseClientUrls,
|
||||||
PeerURLs: cfg.AdvertisePeerUrls,
|
PeerURLs: cfg.AdvertisePeerUrls,
|
||||||
DataDir: cfg.Dir,
|
DataDir: cfg.Dir,
|
||||||
DedicatedWALDir: cfg.WalDir,
|
DedicatedWALDir: cfg.WalDir,
|
||||||
SnapshotCount: cfg.SnapshotCount,
|
SnapshotCount: cfg.SnapshotCount,
|
||||||
SnapshotCatchUpEntries: cfg.SnapshotCatchUpEntries,
|
SnapshotCatchUpEntries: cfg.SnapshotCatchUpEntries,
|
||||||
MaxSnapFiles: cfg.MaxSnapFiles,
|
MaxSnapFiles: cfg.MaxSnapFiles,
|
||||||
MaxWALFiles: cfg.MaxWalFiles,
|
MaxWALFiles: cfg.MaxWalFiles,
|
||||||
InitialPeerURLsMap: urlsmap,
|
InitialPeerURLsMap: urlsmap,
|
||||||
InitialClusterToken: token,
|
InitialClusterToken: token,
|
||||||
DiscoveryURL: cfg.Durl,
|
DiscoveryURL: cfg.Durl,
|
||||||
DiscoveryProxy: cfg.Dproxy,
|
DiscoveryProxy: cfg.Dproxy,
|
||||||
NewCluster: cfg.IsNewCluster(),
|
NewCluster: cfg.IsNewCluster(),
|
||||||
PeerTLSInfo: cfg.PeerTLSInfo,
|
PeerTLSInfo: cfg.PeerTLSInfo,
|
||||||
TickMs: cfg.TickMs,
|
TickMs: cfg.TickMs,
|
||||||
ElectionTicks: cfg.ElectionTicks(),
|
ElectionTicks: cfg.ElectionTicks(),
|
||||||
InitialElectionTickAdvance: cfg.InitialElectionTickAdvance,
|
InitialElectionTickAdvance: cfg.InitialElectionTickAdvance,
|
||||||
AutoCompactionRetention: autoCompactionRetention,
|
AutoCompactionRetention: autoCompactionRetention,
|
||||||
AutoCompactionMode: cfg.AutoCompactionMode,
|
AutoCompactionMode: cfg.AutoCompactionMode,
|
||||||
QuotaBackendBytes: cfg.QuotaBackendBytes,
|
QuotaBackendBytes: cfg.QuotaBackendBytes,
|
||||||
BackendBatchLimit: cfg.BackendBatchLimit,
|
BackendBatchLimit: cfg.BackendBatchLimit,
|
||||||
BackendFreelistType: backendFreelistType,
|
BackendFreelistType: backendFreelistType,
|
||||||
BackendBatchInterval: cfg.BackendBatchInterval,
|
BackendBatchInterval: cfg.BackendBatchInterval,
|
||||||
MaxTxnOps: cfg.MaxTxnOps,
|
MaxTxnOps: cfg.MaxTxnOps,
|
||||||
MaxRequestBytes: cfg.MaxRequestBytes,
|
MaxRequestBytes: cfg.MaxRequestBytes,
|
||||||
MaxConcurrentStreams: cfg.MaxConcurrentStreams,
|
MaxConcurrentStreams: cfg.MaxConcurrentStreams,
|
||||||
StrictReconfigCheck: cfg.StrictReconfigCheck,
|
StrictReconfigCheck: cfg.StrictReconfigCheck,
|
||||||
ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth,
|
ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth,
|
||||||
AuthToken: cfg.AuthToken,
|
AuthToken: cfg.AuthToken,
|
||||||
BcryptCost: cfg.BcryptCost,
|
BcryptCost: cfg.BcryptCost,
|
||||||
TokenTTL: cfg.AuthTokenTTL,
|
TokenTTL: cfg.AuthTokenTTL,
|
||||||
CORS: cfg.CORS,
|
CORS: cfg.CORS,
|
||||||
HostWhitelist: cfg.HostWhitelist,
|
HostWhitelist: cfg.HostWhitelist,
|
||||||
InitialCorruptCheck: cfg.ExperimentalInitialCorruptCheck,
|
InitialCorruptCheck: cfg.ExperimentalInitialCorruptCheck,
|
||||||
CorruptCheckTime: cfg.ExperimentalCorruptCheckTime,
|
CorruptCheckTime: cfg.ExperimentalCorruptCheckTime,
|
||||||
PreVote: cfg.PreVote,
|
PreVote: cfg.PreVote,
|
||||||
Logger: cfg.logger,
|
Logger: cfg.logger,
|
||||||
LoggerConfig: cfg.loggerConfig,
|
LoggerConfig: cfg.loggerConfig,
|
||||||
LoggerCore: cfg.loggerCore,
|
LoggerCore: cfg.loggerCore,
|
||||||
LoggerWriteSyncer: cfg.loggerWriteSyncer,
|
LoggerWriteSyncer: cfg.loggerWriteSyncer,
|
||||||
Debug: cfg.Debug,
|
Debug: cfg.Debug,
|
||||||
ForceNewCluster: cfg.ForceNewCluster,
|
ForceNewCluster: cfg.ForceNewCluster,
|
||||||
EnableGRPCGateway: cfg.EnableGRPCGateway,
|
EnableGRPCGateway: cfg.EnableGRPCGateway,
|
||||||
UnsafeNoFsync: cfg.UnsafeNoFsync,
|
UnsafeNoFsync: cfg.UnsafeNoFsync,
|
||||||
EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
|
NextClusterVersionCompatible: cfg.NextClusterVersionCompatible,
|
||||||
LeaseCheckpointPersist: cfg.ExperimentalEnableLeaseCheckpointPersist,
|
EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
|
||||||
CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
|
LeaseCheckpointPersist: cfg.ExperimentalEnableLeaseCheckpointPersist,
|
||||||
WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval,
|
CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
|
||||||
WarningApplyDuration: cfg.ExperimentalWarningApplyDuration,
|
WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval,
|
||||||
|
WarningApplyDuration: cfg.ExperimentalWarningApplyDuration,
|
||||||
}
|
}
|
||||||
print(e.cfg.logger, *cfg, srvcfg, memberInitialized)
|
print(e.cfg.logger, *cfg, srvcfg, memberInitialized)
|
||||||
if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
|
if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
|
||||||
|
@ -128,7 +128,7 @@ func prepareBackend() backend.Backend {
|
|||||||
|
|
||||||
func rebuildStoreV2() (v2store.Store, uint64) {
|
func rebuildStoreV2() (v2store.Store, uint64) {
|
||||||
var index uint64
|
var index uint64
|
||||||
cl := membership.NewCluster(zap.NewExample(), "")
|
cl := membership.NewCluster(zap.NewExample(), "", true)
|
||||||
|
|
||||||
waldir := migrateWALdir
|
waldir := migrateWALdir
|
||||||
if len(waldir) == 0 {
|
if len(waldir) == 0 {
|
||||||
|
@ -194,6 +194,7 @@ func newConfig() *config {
|
|||||||
fs.BoolVar(&cfg.ec.StrictReconfigCheck, "strict-reconfig-check", cfg.ec.StrictReconfigCheck, "Reject reconfiguration requests that would cause quorum loss.")
|
fs.BoolVar(&cfg.ec.StrictReconfigCheck, "strict-reconfig-check", cfg.ec.StrictReconfigCheck, "Reject reconfiguration requests that would cause quorum loss.")
|
||||||
fs.BoolVar(&cfg.ec.EnableV2, "enable-v2", cfg.ec.EnableV2, "Accept etcd V2 client requests.")
|
fs.BoolVar(&cfg.ec.EnableV2, "enable-v2", cfg.ec.EnableV2, "Accept etcd V2 client requests.")
|
||||||
fs.BoolVar(&cfg.ec.PreVote, "pre-vote", cfg.ec.PreVote, "Enable to run an additional Raft election phase.")
|
fs.BoolVar(&cfg.ec.PreVote, "pre-vote", cfg.ec.PreVote, "Enable to run an additional Raft election phase.")
|
||||||
|
fs.BoolVar(&cfg.ec.NextClusterVersionCompatible, "next-cluster-version-compatible", false, "Enable 3.4 to be compatible with next version 3.5, to allow 3.4 server to join 3.5 cluster and start on 3.5 schema")
|
||||||
|
|
||||||
// proxy
|
// proxy
|
||||||
fs.Var(cfg.cf.proxy, "proxy", fmt.Sprintf("Valid values include %q", cfg.cf.proxy.Valids()))
|
fs.Var(cfg.cf.proxy, "proxy", fmt.Sprintf("Valid values include %q", cfg.cf.proxy.Valids()))
|
||||||
|
@ -435,7 +435,7 @@ func startProxy(cfg *config) error {
|
|||||||
|
|
||||||
clientURLs := []string{}
|
clientURLs := []string{}
|
||||||
uf := func() []string {
|
uf := func() []string {
|
||||||
gcls, gerr := etcdserver.GetClusterFromRemotePeers(lg, peerURLs, tr)
|
gcls, gerr := etcdserver.GetClusterFromRemotePeers(lg, peerURLs, tr, cfg.ec.NextClusterVersionCompatible)
|
||||||
if gerr != nil {
|
if gerr != nil {
|
||||||
if lg != nil {
|
if lg != nil {
|
||||||
lg.Warn(
|
lg.Warn(
|
||||||
|
@ -125,6 +125,8 @@ Clustering:
|
|||||||
Interpret 'auto-compaction-retention' one of: periodic|revision. 'periodic' for duration based retention, defaulting to hours if no time unit is provided (e.g. '5m'). 'revision' for revision number based retention.
|
Interpret 'auto-compaction-retention' one of: periodic|revision. 'periodic' for duration based retention, defaulting to hours if no time unit is provided (e.g. '5m'). 'revision' for revision number based retention.
|
||||||
--enable-v2 '` + strconv.FormatBool(embed.DefaultEnableV2) + `'
|
--enable-v2 '` + strconv.FormatBool(embed.DefaultEnableV2) + `'
|
||||||
Accept etcd V2 client requests.
|
Accept etcd V2 client requests.
|
||||||
|
--next-cluster-version-compatible 'false'
|
||||||
|
Enable 3.4 to be compatible with next version 3.5, to allow 3.4 server to join 3.5 cluster and start on 3.5 schema.
|
||||||
|
|
||||||
Security:
|
Security:
|
||||||
--cert-file ''
|
--cert-file ''
|
||||||
|
@ -41,6 +41,7 @@ var (
|
|||||||
"3.2.0": {AuthCapability: true, V3rpcCapability: true},
|
"3.2.0": {AuthCapability: true, V3rpcCapability: true},
|
||||||
"3.3.0": {AuthCapability: true, V3rpcCapability: true},
|
"3.3.0": {AuthCapability: true, V3rpcCapability: true},
|
||||||
"3.4.0": {AuthCapability: true, V3rpcCapability: true},
|
"3.4.0": {AuthCapability: true, V3rpcCapability: true},
|
||||||
|
"3.5.0": {AuthCapability: true, V3rpcCapability: true},
|
||||||
}
|
}
|
||||||
|
|
||||||
enableMapMu sync.RWMutex
|
enableMapMu sync.RWMutex
|
||||||
|
@ -59,6 +59,8 @@ type RaftCluster struct {
|
|||||||
// removed contains the ids of removed members in the cluster.
|
// removed contains the ids of removed members in the cluster.
|
||||||
// removed id cannot be reused.
|
// removed id cannot be reused.
|
||||||
removed map[types.ID]bool
|
removed map[types.ID]bool
|
||||||
|
// NextClusterVersionCompatible allows downgrade from 3.5 to 3.4.
|
||||||
|
NextClusterVersionCompatible bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// ConfigChangeContext represents a context for confChange.
|
// ConfigChangeContext represents a context for confChange.
|
||||||
@ -72,8 +74,8 @@ type ConfigChangeContext struct {
|
|||||||
|
|
||||||
// NewClusterFromURLsMap creates a new raft cluster using provided urls map. Currently, it does not support creating
|
// NewClusterFromURLsMap creates a new raft cluster using provided urls map. Currently, it does not support creating
|
||||||
// cluster with raft learner member.
|
// cluster with raft learner member.
|
||||||
func NewClusterFromURLsMap(lg *zap.Logger, token string, urlsmap types.URLsMap) (*RaftCluster, error) {
|
func NewClusterFromURLsMap(lg *zap.Logger, token string, urlsmap types.URLsMap, nextClusterVersionCompatible bool) (*RaftCluster, error) {
|
||||||
c := NewCluster(lg, token)
|
c := NewCluster(lg, token, nextClusterVersionCompatible)
|
||||||
for name, urls := range urlsmap {
|
for name, urls := range urlsmap {
|
||||||
m := NewMember(name, urls, token, nil)
|
m := NewMember(name, urls, token, nil)
|
||||||
if _, ok := c.members[m.ID]; ok {
|
if _, ok := c.members[m.ID]; ok {
|
||||||
@ -88,8 +90,8 @@ func NewClusterFromURLsMap(lg *zap.Logger, token string, urlsmap types.URLsMap)
|
|||||||
return c, nil
|
return c, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewClusterFromMembers(lg *zap.Logger, token string, id types.ID, membs []*Member) *RaftCluster {
|
func NewClusterFromMembers(lg *zap.Logger, token string, id types.ID, membs []*Member, nextClusterVersionCompatible bool) *RaftCluster {
|
||||||
c := NewCluster(lg, token)
|
c := NewCluster(lg, token, nextClusterVersionCompatible)
|
||||||
c.cid = id
|
c.cid = id
|
||||||
for _, m := range membs {
|
for _, m := range membs {
|
||||||
c.members[m.ID] = m
|
c.members[m.ID] = m
|
||||||
@ -97,12 +99,13 @@ func NewClusterFromMembers(lg *zap.Logger, token string, id types.ID, membs []*M
|
|||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewCluster(lg *zap.Logger, token string) *RaftCluster {
|
func NewCluster(lg *zap.Logger, token string, nextClusterVersionCompatible bool) *RaftCluster {
|
||||||
return &RaftCluster{
|
return &RaftCluster{
|
||||||
lg: lg,
|
lg: lg,
|
||||||
token: token,
|
token: token,
|
||||||
members: make(map[types.ID]*Member),
|
NextClusterVersionCompatible: nextClusterVersionCompatible,
|
||||||
removed: make(map[types.ID]bool),
|
members: make(map[types.ID]*Member),
|
||||||
|
removed: make(map[types.ID]bool),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -248,7 +251,7 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) {
|
|||||||
|
|
||||||
c.members, c.removed = membersFromStore(c.lg, c.v2store)
|
c.members, c.removed = membersFromStore(c.lg, c.v2store)
|
||||||
c.version = clusterVersionFromStore(c.lg, c.v2store)
|
c.version = clusterVersionFromStore(c.lg, c.v2store)
|
||||||
mustDetectDowngrade(c.lg, c.version)
|
mustDetectDowngrade(c.lg, c.version, c.NextClusterVersionCompatible)
|
||||||
onSet(c.lg, c.version)
|
onSet(c.lg, c.version)
|
||||||
|
|
||||||
for _, m := range c.members {
|
for _, m := range c.members {
|
||||||
@ -567,7 +570,7 @@ func (c *RaftCluster) SetVersion(ver *semver.Version, onSet func(*zap.Logger, *s
|
|||||||
}
|
}
|
||||||
oldVer := c.version
|
oldVer := c.version
|
||||||
c.version = ver
|
c.version = ver
|
||||||
mustDetectDowngrade(c.lg, c.version)
|
mustDetectDowngrade(c.lg, c.version, c.NextClusterVersionCompatible)
|
||||||
if c.v2store != nil {
|
if c.v2store != nil {
|
||||||
mustSaveClusterVersionToStore(c.v2store, ver)
|
mustSaveClusterVersionToStore(c.v2store, ver)
|
||||||
}
|
}
|
||||||
@ -786,23 +789,42 @@ func ValidateClusterAndAssignIDs(lg *zap.Logger, local *RaftCluster, existing *R
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func mustDetectDowngrade(lg *zap.Logger, cv *semver.Version) {
|
func mustDetectDowngrade(lg *zap.Logger, cv *semver.Version, nextClusterVersionCompatible bool) {
|
||||||
lv := semver.Must(semver.NewVersion(version.Version))
|
err := detectDowngrade(cv, nextClusterVersionCompatible)
|
||||||
// only keep major.minor version for comparison against cluster version
|
if err != nil {
|
||||||
lv = &semver.Version{Major: lv.Major, Minor: lv.Minor}
|
|
||||||
if cv != nil && lv.LessThan(*cv) {
|
|
||||||
if lg != nil {
|
if lg != nil {
|
||||||
lg.Fatal(
|
lg.Fatal(
|
||||||
"invalid downgrade; server version is lower than determined cluster version",
|
err.Error(),
|
||||||
zap.String("current-server-version", version.Version),
|
zap.String("current-server-version", version.Version),
|
||||||
zap.String("determined-cluster-version", version.Cluster(cv.String())),
|
zap.String("determined-cluster-version", version.Cluster(cv.String())),
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
plog.Fatalf("cluster cannot be downgraded (current version: %s is lower than determined cluster version: %s).", version.Version, version.Cluster(cv.String()))
|
plog.Fatal(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func detectDowngrade(cv *semver.Version, nextClusterVersionCompatible bool) error {
|
||||||
|
if cv == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
lv := semver.Must(semver.NewVersion(version.Version))
|
||||||
|
// only keep major.minor version for comparison against cluster version
|
||||||
|
lv = &semver.Version{Major: lv.Major, Minor: lv.Minor}
|
||||||
|
if !lv.LessThan(*cv) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
// allow 3.4 server to join 3.5 cluster. Note the local data schema will
|
||||||
|
// be automatically migrated to 3.4 if `--next-cluster-version-compatible`
|
||||||
|
// is enabled (true). Users can also execute `etcdutl migrate` to migrate
|
||||||
|
// the data before starting the server.
|
||||||
|
oneMinorVersionDown := &semver.Version{Major: cv.Major, Minor: cv.Minor - 1}
|
||||||
|
if !nextClusterVersionCompatible || !lv.Equal(*oneMinorVersionDown) {
|
||||||
|
return fmt.Errorf("invalid downgrade; (current version: %s is lower than determined cluster version: %s).", version.Version, version.Cluster(cv.String()))
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// IsLocalMemberLearner returns if the local member is raft learner
|
// IsLocalMemberLearner returns if the local member is raft learner
|
||||||
func (c *RaftCluster) IsLocalMemberLearner() bool {
|
func (c *RaftCluster) IsLocalMemberLearner() bool {
|
||||||
c.Lock()
|
c.Lock()
|
||||||
|
@ -21,6 +21,7 @@ import (
|
|||||||
"reflect"
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/coreos/go-semver/semver"
|
||||||
"go.etcd.io/etcd/etcdserver/api/v2store"
|
"go.etcd.io/etcd/etcdserver/api/v2store"
|
||||||
"go.etcd.io/etcd/pkg/mock/mockstore"
|
"go.etcd.io/etcd/pkg/mock/mockstore"
|
||||||
"go.etcd.io/etcd/pkg/testutil"
|
"go.etcd.io/etcd/pkg/testutil"
|
||||||
@ -276,7 +277,7 @@ func TestClusterValidateAndAssignIDs(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestClusterValidateConfigurationChange(t *testing.T) {
|
func TestClusterValidateConfigurationChange(t *testing.T) {
|
||||||
cl := NewCluster(zap.NewExample(), "")
|
cl := NewCluster(zap.NewExample(), "", false)
|
||||||
cl.SetStore(v2store.New())
|
cl.SetStore(v2store.New())
|
||||||
for i := 1; i <= 4; i++ {
|
for i := 1; i <= 4; i++ {
|
||||||
attr := RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", i)}}
|
attr := RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", i)}}
|
||||||
@ -946,3 +947,54 @@ func TestIsReadyToPromoteMember(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDetectDowngrade(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
clusterVersion string
|
||||||
|
nextClusterVersionCompatible bool
|
||||||
|
expectErr bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
expectErr: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
clusterVersion: "3.5.0",
|
||||||
|
expectErr: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
clusterVersion: "3.5.0",
|
||||||
|
nextClusterVersionCompatible: true,
|
||||||
|
expectErr: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
clusterVersion: "3.6.0",
|
||||||
|
expectErr: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
clusterVersion: "3.6.0",
|
||||||
|
nextClusterVersionCompatible: true,
|
||||||
|
expectErr: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
clusterVersion: "3.4.0",
|
||||||
|
expectErr: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
clusterVersion: "3.3.0",
|
||||||
|
expectErr: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for i, tt := range tests {
|
||||||
|
var cv *semver.Version
|
||||||
|
if len(tt.clusterVersion) > 0 {
|
||||||
|
cv = semver.Must(semver.NewVersion(tt.clusterVersion))
|
||||||
|
}
|
||||||
|
err := detectDowngrade(cv, tt.nextClusterVersionCompatible)
|
||||||
|
if tt.expectErr && err == nil {
|
||||||
|
t.Errorf("%d: expect detectDowngrade error, got nil", i)
|
||||||
|
}
|
||||||
|
if !tt.expectErr && err != nil {
|
||||||
|
t.Errorf("%d: expect no detectDowngrade error, got %v", i, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -35,7 +35,7 @@ import (
|
|||||||
// isMemberBootstrapped tries to check if the given member has been bootstrapped
|
// isMemberBootstrapped tries to check if the given member has been bootstrapped
|
||||||
// in the given cluster.
|
// in the given cluster.
|
||||||
func isMemberBootstrapped(lg *zap.Logger, cl *membership.RaftCluster, member string, rt http.RoundTripper, timeout time.Duration) bool {
|
func isMemberBootstrapped(lg *zap.Logger, cl *membership.RaftCluster, member string, rt http.RoundTripper, timeout time.Duration) bool {
|
||||||
rcl, err := getClusterFromRemotePeers(lg, getRemotePeerURLs(cl, member), timeout, false, rt)
|
rcl, err := getClusterFromRemotePeers(lg, getRemotePeerURLs(cl, member), timeout, false, rt, cl.NextClusterVersionCompatible)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
@ -57,12 +57,12 @@ func isMemberBootstrapped(lg *zap.Logger, cl *membership.RaftCluster, member str
|
|||||||
// response, an error is returned.
|
// response, an error is returned.
|
||||||
// Each request has a 10-second timeout. Because the upper limit of TTL is 5s,
|
// Each request has a 10-second timeout. Because the upper limit of TTL is 5s,
|
||||||
// 10 second is enough for building connection and finishing request.
|
// 10 second is enough for building connection and finishing request.
|
||||||
func GetClusterFromRemotePeers(lg *zap.Logger, urls []string, rt http.RoundTripper) (*membership.RaftCluster, error) {
|
func GetClusterFromRemotePeers(lg *zap.Logger, urls []string, rt http.RoundTripper, nextClusterVersionCompatible bool) (*membership.RaftCluster, error) {
|
||||||
return getClusterFromRemotePeers(lg, urls, 10*time.Second, true, rt)
|
return getClusterFromRemotePeers(lg, urls, 10*time.Second, true, rt, nextClusterVersionCompatible)
|
||||||
}
|
}
|
||||||
|
|
||||||
// If logerr is true, it prints out more error messages.
|
// If logerr is true, it prints out more error messages.
|
||||||
func getClusterFromRemotePeers(lg *zap.Logger, urls []string, timeout time.Duration, logerr bool, rt http.RoundTripper) (*membership.RaftCluster, error) {
|
func getClusterFromRemotePeers(lg *zap.Logger, urls []string, timeout time.Duration, logerr bool, rt http.RoundTripper, nextClusterVersionCompatible bool) (*membership.RaftCluster, error) {
|
||||||
cc := &http.Client{
|
cc := &http.Client{
|
||||||
Transport: rt,
|
Transport: rt,
|
||||||
Timeout: timeout,
|
Timeout: timeout,
|
||||||
@ -128,7 +128,7 @@ func getClusterFromRemotePeers(lg *zap.Logger, urls []string, timeout time.Durat
|
|||||||
// if membership members are not present then the raft cluster formed will be
|
// if membership members are not present then the raft cluster formed will be
|
||||||
// an invalid empty cluster hence return failed to get raft cluster member(s) from the given urls error
|
// an invalid empty cluster hence return failed to get raft cluster member(s) from the given urls error
|
||||||
if len(membs) > 0 {
|
if len(membs) > 0 {
|
||||||
return membership.NewClusterFromMembers(lg, "", id, membs), nil
|
return membership.NewClusterFromMembers(lg, "", id, membs, nextClusterVersionCompatible), nil
|
||||||
}
|
}
|
||||||
return nil, fmt.Errorf("failed to get raft cluster member(s) from the given URLs")
|
return nil, fmt.Errorf("failed to get raft cluster member(s) from the given URLs")
|
||||||
}
|
}
|
||||||
|
@ -172,6 +172,8 @@ type ServerConfig struct {
|
|||||||
// UnsafeNoFsync disables all uses of fsync.
|
// UnsafeNoFsync disables all uses of fsync.
|
||||||
// Setting this is unsafe and will cause data loss.
|
// Setting this is unsafe and will cause data loss.
|
||||||
UnsafeNoFsync bool `json:"unsafe-no-fsync"`
|
UnsafeNoFsync bool `json:"unsafe-no-fsync"`
|
||||||
|
// NextClusterVersionCompatible enables 3.4 to be compatible with next version 3.5, to allow 3.4 server to join 3.5 cluster and start on 3.5 schema.
|
||||||
|
NextClusterVersionCompatible bool `json:"next-cluster-version-compatible"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// VerifyBootstrap sanity-checks the initial config for bootstrap case
|
// VerifyBootstrap sanity-checks the initial config for bootstrap case
|
||||||
|
@ -593,7 +593,7 @@ func restartNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *member
|
|||||||
} else {
|
} else {
|
||||||
plog.Infof("restarting member %s in cluster %s at commit index %d", id, cid, st.Commit)
|
plog.Infof("restarting member %s in cluster %s at commit index %d", id, cid, st.Commit)
|
||||||
}
|
}
|
||||||
cl := membership.NewCluster(cfg.Logger, "")
|
cl := membership.NewCluster(cfg.Logger, "", cfg.NextClusterVersionCompatible)
|
||||||
cl.SetID(id, cid)
|
cl.SetID(id, cid)
|
||||||
s := raft.NewMemoryStorage()
|
s := raft.NewMemoryStorage()
|
||||||
if snapshot != nil {
|
if snapshot != nil {
|
||||||
@ -690,7 +690,7 @@ func restartAsStandaloneNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types
|
|||||||
plog.Printf("forcing restart of member %s in cluster %s at commit index %d", id, cid, st.Commit)
|
plog.Printf("forcing restart of member %s in cluster %s at commit index %d", id, cid, st.Commit)
|
||||||
}
|
}
|
||||||
|
|
||||||
cl := membership.NewCluster(cfg.Logger, "")
|
cl := membership.NewCluster(cfg.Logger, "", cfg.NextClusterVersionCompatible)
|
||||||
cl.SetID(id, cid)
|
cl.SetID(id, cid)
|
||||||
s := raft.NewMemoryStorage()
|
s := raft.NewMemoryStorage()
|
||||||
if snapshot != nil {
|
if snapshot != nil {
|
||||||
|
@ -230,7 +230,7 @@ func TestConfigChangeBlocksApply(t *testing.T) {
|
|||||||
|
|
||||||
func TestProcessDuplicatedAppRespMessage(t *testing.T) {
|
func TestProcessDuplicatedAppRespMessage(t *testing.T) {
|
||||||
n := newNopReadyNode()
|
n := newNopReadyNode()
|
||||||
cl := membership.NewCluster(zap.NewExample(), "abc")
|
cl := membership.NewCluster(zap.NewExample(), "abc", false)
|
||||||
|
|
||||||
rs := raft.NewMemoryStorage()
|
rs := raft.NewMemoryStorage()
|
||||||
p := mockstorage.NewStorageRecorder("")
|
p := mockstorage.NewStorageRecorder("")
|
||||||
|
@ -360,11 +360,11 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
|
|||||||
if err = cfg.VerifyJoinExisting(); err != nil {
|
if err = cfg.VerifyJoinExisting(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
cl, err = membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, cfg.InitialPeerURLsMap)
|
cl, err = membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, cfg.InitialPeerURLsMap, cfg.NextClusterVersionCompatible)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
existingCluster, gerr := GetClusterFromRemotePeers(cfg.Logger, getRemotePeerURLs(cl, cfg.Name), prt)
|
existingCluster, gerr := GetClusterFromRemotePeers(cfg.Logger, getRemotePeerURLs(cl, cfg.Name), prt, cfg.NextClusterVersionCompatible)
|
||||||
if gerr != nil {
|
if gerr != nil {
|
||||||
return nil, fmt.Errorf("cannot fetch cluster info from peer urls: %v", gerr)
|
return nil, fmt.Errorf("cannot fetch cluster info from peer urls: %v", gerr)
|
||||||
}
|
}
|
||||||
@ -386,7 +386,7 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
|
|||||||
if err = cfg.VerifyBootstrap(); err != nil {
|
if err = cfg.VerifyBootstrap(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
cl, err = membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, cfg.InitialPeerURLsMap)
|
cl, err = membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, cfg.InitialPeerURLsMap, cfg.NextClusterVersionCompatible)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -408,7 +408,7 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
|
|||||||
if checkDuplicateURL(urlsmap) {
|
if checkDuplicateURL(urlsmap) {
|
||||||
return nil, fmt.Errorf("discovery cluster %s has duplicate url", urlsmap)
|
return nil, fmt.Errorf("discovery cluster %s has duplicate url", urlsmap)
|
||||||
}
|
}
|
||||||
if cl, err = membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, urlsmap); err != nil {
|
if cl, err = membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, urlsmap, cfg.NextClusterVersionCompatible); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -501,7 +501,7 @@ func TestApplyRequestOnAdminMemberAttributes(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestApplyConfChangeError(t *testing.T) {
|
func TestApplyConfChangeError(t *testing.T) {
|
||||||
cl := membership.NewCluster(zap.NewExample(), "")
|
cl := membership.NewCluster(zap.NewExample(), "", false)
|
||||||
cl.SetStore(v2store.New())
|
cl.SetStore(v2store.New())
|
||||||
for i := 1; i <= 4; i++ {
|
for i := 1; i <= 4; i++ {
|
||||||
cl.AddMember(&membership.Member{ID: types.ID(i)})
|
cl.AddMember(&membership.Member{ID: types.ID(i)})
|
||||||
@ -589,7 +589,7 @@ func TestApplyConfChangeError(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestApplyConfChangeShouldStop(t *testing.T) {
|
func TestApplyConfChangeShouldStop(t *testing.T) {
|
||||||
cl := membership.NewCluster(zap.NewExample(), "")
|
cl := membership.NewCluster(zap.NewExample(), "", false)
|
||||||
cl.SetStore(v2store.New())
|
cl.SetStore(v2store.New())
|
||||||
for i := 1; i <= 3; i++ {
|
for i := 1; i <= 3; i++ {
|
||||||
cl.AddMember(&membership.Member{ID: types.ID(i)})
|
cl.AddMember(&membership.Member{ID: types.ID(i)})
|
||||||
@ -633,7 +633,7 @@ func TestApplyConfChangeShouldStop(t *testing.T) {
|
|||||||
// TestApplyConfigChangeUpdatesConsistIndex ensures a config change also updates the consistIndex
|
// TestApplyConfigChangeUpdatesConsistIndex ensures a config change also updates the consistIndex
|
||||||
// where consistIndex equals to applied index.
|
// where consistIndex equals to applied index.
|
||||||
func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {
|
func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {
|
||||||
cl := membership.NewCluster(zap.NewExample(), "")
|
cl := membership.NewCluster(zap.NewExample(), "", false)
|
||||||
cl.SetStore(v2store.New())
|
cl.SetStore(v2store.New())
|
||||||
cl.AddMember(&membership.Member{ID: types.ID(1)})
|
cl.AddMember(&membership.Member{ID: types.ID(1)})
|
||||||
r := newRaftNode(raftNodeConfig{
|
r := newRaftNode(raftNodeConfig{
|
||||||
@ -679,7 +679,7 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {
|
|||||||
// TestApplyMultiConfChangeShouldStop ensures that apply will return shouldStop
|
// TestApplyMultiConfChangeShouldStop ensures that apply will return shouldStop
|
||||||
// if the local member is removed along with other conf updates.
|
// if the local member is removed along with other conf updates.
|
||||||
func TestApplyMultiConfChangeShouldStop(t *testing.T) {
|
func TestApplyMultiConfChangeShouldStop(t *testing.T) {
|
||||||
cl := membership.NewCluster(zap.NewExample(), "")
|
cl := membership.NewCluster(zap.NewExample(), "", false)
|
||||||
cl.SetStore(v2store.New())
|
cl.SetStore(v2store.New())
|
||||||
for i := 1; i <= 5; i++ {
|
for i := 1; i <= 5; i++ {
|
||||||
cl.AddMember(&membership.Member{ID: types.ID(i)})
|
cl.AddMember(&membership.Member{ID: types.ID(i)})
|
||||||
@ -1038,7 +1038,7 @@ func TestSnapshot(t *testing.T) {
|
|||||||
func TestSnapshotOrdering(t *testing.T) {
|
func TestSnapshotOrdering(t *testing.T) {
|
||||||
n := newNopReadyNode()
|
n := newNopReadyNode()
|
||||||
st := v2store.New()
|
st := v2store.New()
|
||||||
cl := membership.NewCluster(zap.NewExample(), "abc")
|
cl := membership.NewCluster(zap.NewExample(), "abc", false)
|
||||||
cl.SetStore(st)
|
cl.SetStore(st)
|
||||||
|
|
||||||
testdir, err := ioutil.TempDir(os.TempDir(), "testsnapdir")
|
testdir, err := ioutil.TempDir(os.TempDir(), "testsnapdir")
|
||||||
@ -1191,7 +1191,7 @@ func TestTriggerSnap(t *testing.T) {
|
|||||||
func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
|
func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
|
||||||
n := newNopReadyNode()
|
n := newNopReadyNode()
|
||||||
st := v2store.New()
|
st := v2store.New()
|
||||||
cl := membership.NewCluster(zap.NewExample(), "abc")
|
cl := membership.NewCluster(zap.NewExample(), "abc", false)
|
||||||
cl.SetStore(st)
|
cl.SetStore(st)
|
||||||
|
|
||||||
testdir, err := ioutil.TempDir(os.TempDir(), "testsnapdir")
|
testdir, err := ioutil.TempDir(os.TempDir(), "testsnapdir")
|
||||||
@ -1638,7 +1638,7 @@ func TestGetOtherPeerURLs(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
cl := membership.NewClusterFromMembers(zap.NewExample(), "", types.ID(0), tt.membs)
|
cl := membership.NewClusterFromMembers(zap.NewExample(), "", types.ID(0), tt.membs, false)
|
||||||
self := "1"
|
self := "1"
|
||||||
urls := getRemotePeerURLs(cl, self)
|
urls := getRemotePeerURLs(cl, self)
|
||||||
if !reflect.DeepEqual(urls, tt.wurls) {
|
if !reflect.DeepEqual(urls, tt.wurls) {
|
||||||
@ -1790,7 +1790,7 @@ func (n *nodeCommitter) Propose(ctx context.Context, data []byte) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func newTestCluster(membs []*membership.Member) *membership.RaftCluster {
|
func newTestCluster(membs []*membership.Member) *membership.RaftCluster {
|
||||||
c := membership.NewCluster(zap.NewExample(), "")
|
c := membership.NewCluster(zap.NewExample(), "", false)
|
||||||
for _, m := range membs {
|
for _, m := range membs {
|
||||||
c.AddMember(m)
|
c.AddMember(m)
|
||||||
}
|
}
|
||||||
|
@ -35,7 +35,7 @@ func TestLongestConnected(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
clus, err := membership.NewClusterFromURLsMap(zap.NewExample(), "test", umap)
|
clus, err := membership.NewClusterFromURLsMap(zap.NewExample(), "test", umap, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user