diff --git a/server/config/config.go b/server/config/config.go index d02e1f4b0..0dac25c41 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -137,8 +137,10 @@ type ServerConfig struct { // InitialCorruptCheck is true to check data corruption on boot // before serving any peer/client traffic. - InitialCorruptCheck bool - CorruptCheckTime time.Duration + InitialCorruptCheck bool + CorruptCheckTime time.Duration + CompactHashCheckEnabled bool + CompactHashCheckTime time.Duration // PreVote is true to enable Raft Pre-Vote. PreVote bool diff --git a/server/embed/config.go b/server/embed/config.go index c449f4919..75b6d5d5c 100644 --- a/server/embed/config.go +++ b/server/embed/config.go @@ -314,8 +314,10 @@ type Config struct { // AuthTokenTTL specifies the TTL in seconds of the simple token AuthTokenTTL uint `json:"auth-token-ttl"` - ExperimentalInitialCorruptCheck bool `json:"experimental-initial-corrupt-check"` - ExperimentalCorruptCheckTime time.Duration `json:"experimental-corrupt-check-time"` + ExperimentalInitialCorruptCheck bool `json:"experimental-initial-corrupt-check"` + ExperimentalCorruptCheckTime time.Duration `json:"experimental-corrupt-check-time"` + ExperimentalCompactHashCheckEnabled bool `json:"experimental-compact-hash-check-enabled"` + ExperimentalCompactHashCheckTime time.Duration `json:"experimental-compact-hash-check-time"` // ExperimentalEnableV2V3 configures URLs that expose deprecated V2 API working on V3 store. // Deprecated in v3.5. // TODO: Delete in v3.6 (https://github.com/etcd-io/etcd/issues/12913) @@ -501,6 +503,9 @@ func NewConfig() *Config { ExperimentalMemoryMlock: false, ExperimentalTxnModeWriteWithSharedBuffer: true, + ExperimentalCompactHashCheckEnabled: false, + ExperimentalCompactHashCheckTime: time.Minute, + V2Deprecation: config.V2_DEPR_DEFAULT, } cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name) @@ -698,6 +703,10 @@ func (cfg *Config) Validate() error { return fmt.Errorf("setting experimental-enable-lease-checkpoint-persist requires experimental-enable-lease-checkpoint") } + if cfg.ExperimentalCompactHashCheckTime <= 0 { + return fmt.Errorf("--experimental-compact-hash-check-time must be >0 (set to %v)", cfg.ExperimentalCompactHashCheckTime) + } + return nil } diff --git a/server/embed/etcd.go b/server/embed/etcd.go index a279174e7..f612e96ef 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -203,6 +203,8 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { HostWhitelist: cfg.HostWhitelist, InitialCorruptCheck: cfg.ExperimentalInitialCorruptCheck, CorruptCheckTime: cfg.ExperimentalCorruptCheckTime, + CompactHashCheckEnabled: cfg.ExperimentalCompactHashCheckEnabled, + CompactHashCheckTime: cfg.ExperimentalCompactHashCheckTime, PreVote: cfg.PreVote, Logger: cfg.logger, ForceNewCluster: cfg.ForceNewCluster, @@ -339,6 +341,8 @@ func print(lg *zap.Logger, ec Config, sc config.ServerConfig, memberInitialized zap.Bool("pre-vote", sc.PreVote), zap.Bool("initial-corrupt-check", sc.InitialCorruptCheck), zap.String("corrupt-check-time-interval", sc.CorruptCheckTime.String()), + zap.Bool("compact-check-time-enabled", sc.CompactHashCheckEnabled), + zap.Duration("compact-check-time-interval", sc.CompactHashCheckTime), zap.String("auto-compaction-mode", sc.AutoCompactionMode), zap.Duration("auto-compaction-retention", sc.AutoCompactionRetention), zap.String("auto-compaction-interval", sc.AutoCompactionRetention.String()), diff --git a/server/etcdmain/config.go b/server/etcdmain/config.go index 14b58c47d..59d3a973f 100644 --- a/server/etcdmain/config.go +++ b/server/etcdmain/config.go @@ -281,6 +281,8 @@ func newConfig() *config { // experimental fs.BoolVar(&cfg.ec.ExperimentalInitialCorruptCheck, "experimental-initial-corrupt-check", cfg.ec.ExperimentalInitialCorruptCheck, "Enable to check data corruption before serving any client/peer traffic.") fs.DurationVar(&cfg.ec.ExperimentalCorruptCheckTime, "experimental-corrupt-check-time", cfg.ec.ExperimentalCorruptCheckTime, "Duration of time between cluster corruption check passes.") + fs.BoolVar(&cfg.ec.ExperimentalCompactHashCheckEnabled, "experimental-compact-hash-check-enabled", cfg.ec.ExperimentalCompactHashCheckEnabled, "Enable leader to periodically check followers compaction hashes.") + fs.DurationVar(&cfg.ec.ExperimentalCompactHashCheckTime, "experimental-compact-hash-check-time", cfg.ec.ExperimentalCompactHashCheckTime, "Duration of time between leader checks followers compaction hashes.") fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change.") // TODO: delete in v3.7 diff --git a/server/etcdserver/corrupt.go b/server/etcdserver/corrupt.go index c148c8730..d7ae377b0 100644 --- a/server/etcdserver/corrupt.go +++ b/server/etcdserver/corrupt.go @@ -306,13 +306,12 @@ func (cm *corruptionChecker) CompactHashCheck() { cm.mux.Unlock() cm.lg.Info("finished compaction hash check", zap.Int("number-of-hashes-checked", i+1)) return - } else { - cm.lg.Warn("skipped checking hash; was not able to check all peers", - zap.Int("number-of-peers-checked", peersChecked), - zap.Int("number-of-peers", len(peers)), - zap.Int64("revision", hash.Revision), - ) } + cm.lg.Warn("skipped revision in compaction hash check; was not able to check all peers", + zap.Int("number-of-peers-checked", peersChecked), + zap.Int("number-of-peers", len(peers)), + zap.Int64("revision", hash.Revision), + ) } cm.lg.Info("finished compaction hash check", zap.Int("number-of-hashes-checked", len(hashes))) return diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 184947a5d..cbf11738a 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -108,8 +108,7 @@ var ( // monitorVersionInterval should be smaller than the timeout // on the connection. Or we will not be able to reuse the connection // (since it will timeout). - monitorVersionInterval = rafthttp.ConnWriteTimeout - time.Second - CompactHashCheckInterval = 15 * time.Second + monitorVersionInterval = rafthttp.ConnWriteTimeout - time.Second recommendedMaxRequestBytesString = humanize.Bytes(uint64(recommendedMaxRequestBytes)) storeMemberAttributeRegexp = regexp.MustCompile(path.Join(membership.StoreMembersPrefix, "[[:xdigit:]]{1,16}", "attributes")) @@ -2540,9 +2539,13 @@ func (s *EtcdServer) monitorKVHash() { } func (s *EtcdServer) monitorCompactHash() { + if !s.Cfg.CompactHashCheckEnabled { + return + } + t := s.Cfg.CompactHashCheckTime for { select { - case <-time.After(CompactHashCheckInterval): + case <-time.After(t): case <-s.stopping: return } diff --git a/tests/e2e/cluster_test.go b/tests/e2e/cluster_test.go index 63a9fad61..0f8359ed4 100644 --- a/tests/e2e/cluster_test.go +++ b/tests/e2e/cluster_test.go @@ -177,8 +177,10 @@ type etcdProcessClusterConfig struct { rollingStart bool logLevel string - MaxConcurrentStreams uint32 // default is math.MaxUint32 - CorruptCheckTime time.Duration + MaxConcurrentStreams uint32 // default is math.MaxUint32 + CorruptCheckTime time.Duration + CompactHashCheckEnabled bool + CompactHashCheckTime time.Duration } // newEtcdProcessCluster launches a new cluster from etcd processes, returning @@ -330,6 +332,12 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs(tb testing.TB) []* if cfg.CorruptCheckTime != 0 { args = append(args, "--experimental-corrupt-check-time", fmt.Sprintf("%s", cfg.CorruptCheckTime)) } + if cfg.CompactHashCheckEnabled { + args = append(args, "--experimental-compact-hash-check-enabled") + } + if cfg.CompactHashCheckTime != 0 { + args = append(args, "--experimental-compact-hash-check-time", cfg.CompactHashCheckTime.String()) + } etcdCfgs[i] = &etcdServerProcessConfig{ lg: lg, diff --git a/tests/e2e/corrupt_test.go b/tests/e2e/corrupt_test.go index ef788d1c9..7044d4a06 100644 --- a/tests/e2e/corrupt_test.go +++ b/tests/e2e/corrupt_test.go @@ -22,9 +22,8 @@ import ( "github.com/stretchr/testify/assert" "go.etcd.io/etcd/api/v3/etcdserverpb" - clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/server/v3/datadir" - "go.etcd.io/etcd/server/v3/etcdserver" "go.etcd.io/etcd/server/v3/storage/mvcc/testutil" ) @@ -134,10 +133,13 @@ func TestPeriodicCheckDetectsCorruption(t *testing.T) { } func TestCompactHashCheckDetectCorruption(t *testing.T) { + checkTime := time.Second BeforeTest(t) epc, err := newEtcdProcessCluster(t, &etcdProcessClusterConfig{ - clusterSize: 3, - keepDataDir: true, + clusterSize: 3, + keepDataDir: true, + CompactHashCheckEnabled: true, + CompactHashCheckTime: checkTime, }) if err != nil { t.Fatalf("could not start etcd process cluster (%v)", err) @@ -171,7 +173,7 @@ func TestCompactHashCheckDetectCorruption(t *testing.T) { assert.NoError(t, err) _, err = cc.Compact(5) assert.NoError(t, err) - time.Sleep(etcdserver.CompactHashCheckInterval * 11 / 10) + time.Sleep(checkTime * 11 / 10) alarmResponse, err := cc.AlarmList() assert.NoError(t, err, "error on alarm list") assert.Equal(t, []*etcdserverpb.AlarmMember{{Alarm: etcdserverpb.AlarmType_CORRUPT, MemberID: memberID}}, alarmResponse.Alarms)