diff --git a/server/embed/etcd.go b/server/embed/etcd.go index 28b0ff92c..55aca9024 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -247,8 +247,8 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { // newly started member ("memberInitialized==false") // does not need corruption check - if memberInitialized { - if err = e.Server.CheckInitialHashKV(); err != nil { + if memberInitialized && srvcfg.InitialCorruptCheck { + if err = etcdserver.NewCorruptionMonitor(e.cfg.logger, e.Server).InitialCheck(); err != nil { // set "EtcdServer" to nil, so that it does not block on "EtcdServer.Close()" // (nothing to close since rafthttp transports have not been started) diff --git a/server/etcdserver/corrupt.go b/server/etcdserver/corrupt.go index 256c15308..3b02aac44 100644 --- a/server/etcdserver/corrupt.go +++ b/server/etcdserver/corrupt.go @@ -33,33 +33,78 @@ import ( "go.uber.org/zap" ) -// CheckInitialHashKV compares initial hash values with its peers +type corruptionMonitor struct { + lg *zap.Logger + + hasher Hasher +} + +type Hasher interface { + mvcc.Hasher + ReqTimeout() time.Duration + MemberId() types.ID + PeerHashByRev(int64) []*peerHashKVResp + LinearizableReadNotify(context.Context) error + TriggerCorruptAlarm(uint64) +} + +func NewCorruptionMonitor(lg *zap.Logger, s *EtcdServer) *corruptionMonitor { + return &corruptionMonitor{ + lg: lg, + hasher: hasherAdapter{s}, + } +} + +type hasherAdapter struct { + *EtcdServer +} + +func (h hasherAdapter) MemberId() types.ID { + return h.EtcdServer.ID() +} + +func (h hasherAdapter) Hash() (hash uint32, revision int64, err error) { + return h.EtcdServer.KV().Hash() +} + +func (h hasherAdapter) HashByRev(rev int64) (hash uint32, revision int64, compactRev int64, err error) { + return h.EtcdServer.KV().HashByRev(rev) +} + +func (h hasherAdapter) ReqTimeout() time.Duration { + return h.EtcdServer.Cfg.ReqTimeout() +} + +func (h hasherAdapter) PeerHashByRev(rev int64) []*peerHashKVResp { + return h.EtcdServer.getPeerHashKVs(rev) +} + +func (h hasherAdapter) TriggerCorruptAlarm(memberID uint64) { + h.EtcdServer.triggerCorruptAlarm(memberID) +} + +// InitialCheck compares initial hash values with its peers // before serving any peer/client traffic. Only mismatch when hashes // are different at requested revision, with same compact revision. -func (s *EtcdServer) CheckInitialHashKV() error { - if !s.Cfg.InitialCorruptCheck { - return nil - } +func (cm *corruptionMonitor) InitialCheck() error { - lg := s.Logger() - - lg.Info( + cm.lg.Info( "starting initial corruption check", - zap.String("local-member-id", s.ID().String()), - zap.Duration("timeout", s.Cfg.ReqTimeout()), + zap.String("local-member-id", cm.hasher.MemberId().String()), + zap.Duration("timeout", cm.hasher.ReqTimeout()), ) - h, rev, crev, err := s.kv.HashByRev(0) + h, rev, crev, err := cm.hasher.HashByRev(0) if err != nil { - return fmt.Errorf("%s failed to fetch hash (%v)", s.ID(), err) + return fmt.Errorf("%s failed to fetch hash (%v)", cm.hasher.MemberId(), err) } - peers := s.getPeerHashKVs(rev) + peers := cm.hasher.PeerHashByRev(rev) mismatch := 0 for _, p := range peers { if p.resp != nil { peerID := types.ID(p.resp.Header.MemberId) fields := []zap.Field{ - zap.String("local-member-id", s.ID().String()), + zap.String("local-member-id", cm.hasher.MemberId().String()), zap.Int64("local-member-revision", rev), zap.Int64("local-member-compact-revision", crev), zap.Uint32("local-member-hash", h), @@ -72,10 +117,10 @@ func (s *EtcdServer) CheckInitialHashKV() error { if h != p.resp.Hash { if crev == p.resp.CompactRevision { - lg.Warn("found different hash values from remote peer", fields...) + cm.lg.Warn("found different hash values from remote peer", fields...) mismatch++ } else { - lg.Warn("found different compact revision values from remote peer", fields...) + cm.lg.Warn("found different compact revision values from remote peer", fields...) } } @@ -85,9 +130,9 @@ func (s *EtcdServer) CheckInitialHashKV() error { if p.err != nil { switch p.err { case rpctypes.ErrFutureRev: - lg.Warn( + cm.lg.Warn( "cannot fetch hash from slow remote peer", - zap.String("local-member-id", s.ID().String()), + zap.String("local-member-id", cm.hasher.MemberId().String()), zap.Int64("local-member-revision", rev), zap.Int64("local-member-compact-revision", crev), zap.Uint32("local-member-hash", h), @@ -96,9 +141,9 @@ func (s *EtcdServer) CheckInitialHashKV() error { zap.Error(err), ) case rpctypes.ErrCompacted: - lg.Warn( + cm.lg.Warn( "cannot fetch hash from remote peer; local member is behind", - zap.String("local-member-id", s.ID().String()), + zap.String("local-member-id", cm.hasher.MemberId().String()), zap.Int64("local-member-revision", rev), zap.Int64("local-member-compact-revision", crev), zap.Uint32("local-member-hash", h), @@ -110,61 +155,31 @@ func (s *EtcdServer) CheckInitialHashKV() error { } } if mismatch > 0 { - return fmt.Errorf("%s found data inconsistency with peers", s.ID()) + return fmt.Errorf("%s found data inconsistency with peers", cm.hasher.MemberId()) } - lg.Info( + cm.lg.Info( "initial corruption checking passed; no corruption", - zap.String("local-member-id", s.ID().String()), + zap.String("local-member-id", cm.hasher.MemberId().String()), ) return nil } -func (s *EtcdServer) monitorKVHash() { - t := s.Cfg.CorruptCheckTime - if t == 0 { - return - } - - lg := s.Logger() - lg.Info( - "enabled corruption checking", - zap.String("local-member-id", s.ID().String()), - zap.Duration("interval", t), - ) - - for { - select { - case <-s.stopping: - return - case <-time.After(t): - } - if !s.isLeader() { - continue - } - if err := s.checkHashKV(); err != nil { - lg.Warn("failed to check hash KV", zap.Error(err)) - } - } -} - -func (s *EtcdServer) checkHashKV() error { - lg := s.Logger() - - h, rev, crev, err := s.kv.HashByRev(0) +func (cm *corruptionMonitor) periodicCheck() error { + h, rev, crev, err := cm.hasher.HashByRev(0) if err != nil { return err } - peers := s.getPeerHashKVs(rev) + peers := cm.hasher.PeerHashByRev(rev) - ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout()) - err = s.linearizableReadNotify(ctx) + ctx, cancel := context.WithTimeout(context.Background(), cm.hasher.ReqTimeout()) + err = cm.hasher.LinearizableReadNotify(ctx) cancel() if err != nil { return err } - h2, rev2, crev2, err := s.kv.HashByRev(0) + h2, rev2, crev2, err := cm.hasher.HashByRev(0) if err != nil { return err } @@ -175,11 +190,11 @@ func (s *EtcdServer) checkHashKV() error { return } alarmed = true - s.triggerCorruptAlarm(id) + cm.hasher.TriggerCorruptAlarm(id) } if h2 != h && rev2 == rev && crev == crev2 { - lg.Warn( + cm.lg.Warn( "found hash mismatch", zap.Int64("revision-1", rev), zap.Int64("compact-revision-1", crev), @@ -188,7 +203,7 @@ func (s *EtcdServer) checkHashKV() error { zap.Int64("compact-revision-2", crev2), zap.Uint32("hash-2", h2), ) - mismatch(uint64(s.ID())) + mismatch(uint64(cm.hasher.MemberId())) } checkedCount := 0 @@ -201,7 +216,7 @@ func (s *EtcdServer) checkHashKV() error { // leader expects follower's latest revision less than or equal to leader's if p.resp.Header.Revision > rev2 { - lg.Warn( + cm.lg.Warn( "revision from follower must be less than or equal to leader's", zap.Int64("leader-revision", rev2), zap.Int64("follower-revision", p.resp.Header.Revision), @@ -212,7 +227,7 @@ func (s *EtcdServer) checkHashKV() error { // leader expects follower's latest compact revision less than or equal to leader's if p.resp.CompactRevision > crev2 { - lg.Warn( + cm.lg.Warn( "compact revision from follower must be less than or equal to leader's", zap.Int64("leader-compact-revision", crev2), zap.Int64("follower-compact-revision", p.resp.CompactRevision), @@ -223,7 +238,7 @@ func (s *EtcdServer) checkHashKV() error { // follower's compact revision is leader's old one, then hashes must match if p.resp.CompactRevision == crev && p.resp.Hash != h { - lg.Warn( + cm.lg.Warn( "same compact revision then hashes must match", zap.Int64("leader-compact-revision", crev2), zap.Uint32("leader-hash", h), @@ -234,7 +249,7 @@ func (s *EtcdServer) checkHashKV() error { mismatch(id) } } - lg.Info("finished peer corruption check", zap.Int("number-of-peers-checked", checkedCount)) + cm.lg.Info("finished peer corruption check", zap.Int("number-of-peers-checked", checkedCount)) return nil } diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 87148eab4..1fd5a98f8 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -2508,6 +2508,34 @@ func (s *EtcdServer) monitorVersions() { } } +func (s *EtcdServer) monitorKVHash() { + t := s.Cfg.CorruptCheckTime + if t == 0 { + return + } + + lg := s.Logger() + lg.Info( + "enabled corruption checking", + zap.String("local-member-id", s.ID().String()), + zap.Duration("interval", t), + ) + monitor := NewCorruptionMonitor(lg, s) + for { + select { + case <-s.stopping: + return + case <-time.After(t): + } + if !s.isLeader() { + continue + } + if err := monitor.periodicCheck(); err != nil { + lg.Warn("failed to check hash KV", zap.Error(err)) + } + } +} + func (s *EtcdServer) updateClusterVersionV2(ver string) { lg := s.Logger() diff --git a/server/mvcc/kv.go b/server/mvcc/kv.go index 79c2e6870..9dbb29947 100644 --- a/server/mvcc/kv.go +++ b/server/mvcc/kv.go @@ -112,6 +112,7 @@ const ( type KV interface { ReadView WriteView + Hasher // Read creates a read transaction. Read(mode ReadTxMode, trace *traceutil.Trace) TxnRead @@ -119,12 +120,6 @@ type KV interface { // Write creates a write transaction. Write(trace *traceutil.Trace) TxnWrite - // Hash computes the hash of the KV's backend. - Hash() (hash uint32, revision int64, err error) - - // HashByRev computes the hash of all MVCC revisions up to a given revision. - HashByRev(rev int64) (hash uint32, revision int64, compactRev int64, err error) - // Compact frees all superseded keys with revisions less than rev. Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) @@ -136,6 +131,14 @@ type KV interface { Close() error } +type Hasher interface { + // Hash computes the hash of the KV's backend. + Hash() (hash uint32, revision int64, err error) + + // HashByRev computes the hash of all MVCC revisions up to a given revision. + HashByRev(rev int64) (hash uint32, revision int64, compactRev int64, err error) +} + // WatchableKV is a KV that can be watched. type WatchableKV interface { KV