diff --git a/server/embed/etcd.go b/server/embed/etcd.go index 55aca9024..a279174e7 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -248,7 +248,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { // newly started member ("memberInitialized==false") // does not need corruption check if memberInitialized && srvcfg.InitialCorruptCheck { - if err = etcdserver.NewCorruptionMonitor(e.cfg.logger, e.Server).InitialCheck(); err != nil { + if err = e.Server.CorruptionChecker().InitialCheck(); err != nil { // set "EtcdServer" to nil, so that it does not block on "EtcdServer.Close()" // (nothing to close since rafthttp transports have not been started) diff --git a/server/etcdserver/corrupt.go b/server/etcdserver/corrupt.go index 2d88cc901..dc7e77548 100644 --- a/server/etcdserver/corrupt.go +++ b/server/etcdserver/corrupt.go @@ -33,7 +33,12 @@ import ( "go.uber.org/zap" ) -type corruptionMonitor struct { +type CorruptionChecker interface { + InitialCheck() error + PeriodicCheck() error +} + +type corruptionChecker struct { lg *zap.Logger hasher Hasher @@ -48,8 +53,8 @@ type Hasher interface { TriggerCorruptAlarm(uint64) } -func NewCorruptionMonitor(lg *zap.Logger, s *EtcdServer) *corruptionMonitor { - return &corruptionMonitor{ +func NewCorruptionChecker(lg *zap.Logger, s *EtcdServer) *corruptionChecker { + return &corruptionChecker{ lg: lg, hasher: hasherAdapter{s, s.KV().HashStorage()}, } @@ -79,7 +84,7 @@ func (h hasherAdapter) TriggerCorruptAlarm(memberID uint64) { // InitialCheck compares initial hash values with its peers // before serving any peer/client traffic. Only mismatch when hashes // are different at requested revision, with same compact revision. -func (cm *corruptionMonitor) InitialCheck() error { +func (cm *corruptionChecker) InitialCheck() error { cm.lg.Info( "starting initial corruption check", @@ -158,7 +163,7 @@ func (cm *corruptionMonitor) InitialCheck() error { return nil } -func (cm *corruptionMonitor) periodicCheck() error { +func (cm *corruptionChecker) PeriodicCheck() error { h, rev, err := cm.hasher.HashByRev(0) if err != nil { return err diff --git a/server/etcdserver/corrupt_test.go b/server/etcdserver/corrupt_test.go index 2f98b1800..77a1a859d 100644 --- a/server/etcdserver/corrupt_test.go +++ b/server/etcdserver/corrupt_test.go @@ -88,7 +88,7 @@ func TestInitialCheck(t *testing.T) { } for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { - monitor := corruptionMonitor{ + monitor := corruptionChecker{ lg: zaptest.NewLogger(t), hasher: &tc.hasher, } @@ -205,11 +205,11 @@ func TestPeriodicCheck(t *testing.T) { } for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { - monitor := corruptionMonitor{ + monitor := corruptionChecker{ lg: zaptest.NewLogger(t), hasher: &tc.hasher, } - err := monitor.periodicCheck() + err := monitor.PeriodicCheck() if gotError := err != nil; gotError != tc.expectError { t.Errorf("Unexpected error, got: %v, expected?: %v", err, tc.expectError) } diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 1fd5a98f8..78c65df74 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -293,6 +293,7 @@ type EtcdServer struct { firstCommitInTermC chan struct{} *AccessController + corruptionChecker CorruptionChecker } type backendHooks struct { @@ -629,6 +630,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { ) } } + srv.corruptionChecker = NewCorruptionChecker(cfg.Logger, srv) srv.authStore = auth.NewAuthStore(srv.Logger(), srv.be, tp, int(cfg.BcryptCost)) @@ -2520,7 +2522,6 @@ func (s *EtcdServer) monitorKVHash() { zap.String("local-member-id", s.ID().String()), zap.Duration("interval", t), ) - monitor := NewCorruptionMonitor(lg, s) for { select { case <-s.stopping: @@ -2530,7 +2531,7 @@ func (s *EtcdServer) monitorKVHash() { if !s.isLeader() { continue } - if err := monitor.periodicCheck(); err != nil { + if err := s.corruptionChecker.PeriodicCheck(); err != nil { lg.Warn("failed to check hash KV", zap.Error(err)) } } @@ -2773,3 +2774,7 @@ func maybeDefragBackend(cfg config.ServerConfig, be backend.Backend) error { } return be.Defrag() } + +func (s *EtcdServer) CorruptionChecker() CorruptionChecker { + return s.corruptionChecker +}