diff --git a/server/embed/etcd.go b/server/embed/etcd.go index 564ad5e7a..6332917a7 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -252,7 +252,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { // newly started member ("memberInitialized==false") // does not need corruption check if memberInitialized && srvcfg.InitialCorruptCheck { - if err = etcdserver.NewCorruptionMonitor(e.cfg.logger, e.Server).InitialCheck(); err != nil { + if err = e.Server.CorruptionChecker().InitialCheck(); err != nil { // set "EtcdServer" to nil, so that it does not block on "EtcdServer.Close()" // (nothing to close since rafthttp transports have not been started) diff --git a/server/etcdserver/corrupt.go b/server/etcdserver/corrupt.go index 152cb0a91..f467b0c53 100644 --- a/server/etcdserver/corrupt.go +++ b/server/etcdserver/corrupt.go @@ -32,7 +32,12 @@ import ( "go.uber.org/zap" ) -type corruptionMonitor struct { +type CorruptionChecker interface { + InitialCheck() error + PeriodicCheck() error +} + +type corruptionChecker struct { lg *zap.Logger hasher Hasher @@ -47,8 +52,8 @@ type Hasher interface { TriggerCorruptAlarm(uint64) } -func NewCorruptionMonitor(lg *zap.Logger, s *EtcdServer) *corruptionMonitor { - return &corruptionMonitor{ +func NewCorruptionChecker(lg *zap.Logger, s *EtcdServer) *corruptionChecker { + return &corruptionChecker{ lg: lg, hasher: hasherAdapter{s, s.KV().HashStorage()}, } @@ -74,7 +79,7 @@ func (h hasherAdapter) TriggerCorruptAlarm(memberID uint64) { // InitialCheck compares initial hash values with its peers // before serving any peer/client traffic. Only mismatch when hashes // are different at requested revision, with same compact revision. -func (cm *corruptionMonitor) InitialCheck() error { +func (cm *corruptionChecker) InitialCheck() error { cm.lg.Info( "starting initial corruption check", @@ -153,7 +158,7 @@ func (cm *corruptionMonitor) InitialCheck() error { return nil } -func (cm *corruptionMonitor) periodicCheck() error { +func (cm *corruptionChecker) PeriodicCheck() error { h, rev, err := cm.hasher.HashByRev(0) if err != nil { return err diff --git a/server/etcdserver/corrupt_test.go b/server/etcdserver/corrupt_test.go index d2b976f74..17e5aabb0 100644 --- a/server/etcdserver/corrupt_test.go +++ b/server/etcdserver/corrupt_test.go @@ -88,7 +88,7 @@ func TestInitialCheck(t *testing.T) { } for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { - monitor := corruptionMonitor{ + monitor := corruptionChecker{ lg: zaptest.NewLogger(t), hasher: &tc.hasher, } @@ -205,11 +205,11 @@ func TestPeriodicCheck(t *testing.T) { } for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { - monitor := corruptionMonitor{ + monitor := corruptionChecker{ lg: zaptest.NewLogger(t), hasher: &tc.hasher, } - err := monitor.periodicCheck() + err := monitor.PeriodicCheck() if gotError := err != nil; gotError != tc.expectError { t.Errorf("Unexpected error, got: %v, expected?: %v", err, tc.expectError) } diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index e98bb3259..c5c43bacf 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -295,7 +295,8 @@ type EtcdServer struct { *AccessController // forceSnapshot can force snapshot be triggered after apply, independent of the snapshotCount. // Should only be set within apply code path. Used to force snapshot after cluster version downgrade. - forceSnapshot bool + forceSnapshot bool + corruptionChecker CorruptionChecker } // NewServer creates a new EtcdServer from the supplied configuration. The @@ -371,6 +372,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { CompactionSleepInterval: cfg.CompactionSleepInterval, } srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig) + srv.corruptionChecker = NewCorruptionChecker(cfg.Logger, srv) srv.authStore = auth.NewAuthStore(srv.Logger(), schema.NewAuthBackend(srv.Logger(), srv.be), tp, int(cfg.BcryptCost)) @@ -2199,7 +2201,6 @@ func (s *EtcdServer) monitorKVHash() { zap.String("local-member-id", s.MemberId().String()), zap.Duration("interval", t), ) - monitor := NewCorruptionMonitor(lg, s) for { select { case <-s.stopping: @@ -2209,7 +2210,7 @@ func (s *EtcdServer) monitorKVHash() { if !s.isLeader() { continue } - if err := monitor.periodicCheck(); err != nil { + if err := s.corruptionChecker.PeriodicCheck(); err != nil { lg.Warn("failed to check hash KV", zap.Error(err)) } } @@ -2416,3 +2417,7 @@ func (s *EtcdServer) getTxPostLockInsideApplyHook() func() { } } } + +func (s *EtcdServer) CorruptionChecker() CorruptionChecker { + return s.corruptionChecker +}