diff --git a/server/embed/etcd.go b/server/embed/etcd.go index ffd239c79..5838c89cb 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -250,8 +250,8 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { // newly started member ("memberInitialized==false") // does not need corruption check - if memberInitialized { - if err = e.Server.CheckInitialHashKV(); err != nil { + if memberInitialized && srvcfg.InitialCorruptCheck { + if err = etcdserver.NewCorruptionMonitor(e.cfg.logger, e.Server).InitialCheck(); err != nil { // set "EtcdServer" to nil, so that it does not block on "EtcdServer.Close()" // (nothing to close since rafthttp transports have not been started) diff --git a/server/etcdserver/corrupt.go b/server/etcdserver/corrupt.go index c8a0437dc..77a4d6a41 100644 --- a/server/etcdserver/corrupt.go +++ b/server/etcdserver/corrupt.go @@ -32,33 +32,74 @@ import ( "go.uber.org/zap" ) -// CheckInitialHashKV compares initial hash values with its peers +type corruptionMonitor struct { + lg *zap.Logger + + hasher Hasher +} + +type Hasher interface { + mvcc.Hasher + ReqTimeout() time.Duration + MemberId() types.ID + PeerHashByRev(int64) []*peerHashKVResp + LinearizableReadNotify(context.Context) error + TriggerCorruptAlarm(uint64) +} + +func NewCorruptionMonitor(lg *zap.Logger, s *EtcdServer) *corruptionMonitor { + return &corruptionMonitor{ + lg: lg, + hasher: hasherAdapter{s}, + } +} + +type hasherAdapter struct { + *EtcdServer +} + +func (h hasherAdapter) Hash() (hash uint32, revision int64, err error) { + return h.EtcdServer.KV().Hash() +} + +func (h hasherAdapter) HashByRev(rev int64) (hash uint32, revision int64, compactRev int64, err error) { + return h.EtcdServer.KV().HashByRev(rev) +} + +func (h hasherAdapter) ReqTimeout() time.Duration { + return h.EtcdServer.Cfg.ReqTimeout() +} + +func (h hasherAdapter) PeerHashByRev(rev int64) []*peerHashKVResp { + return h.EtcdServer.getPeerHashKVs(rev) +} + +func (h hasherAdapter) TriggerCorruptAlarm(memberID uint64) { + h.EtcdServer.triggerCorruptAlarm(memberID) +} + +// InitialCheck compares initial hash values with its peers // before serving any peer/client traffic. Only mismatch when hashes // are different at requested revision, with same compact revision. -func (s *EtcdServer) CheckInitialHashKV() error { - if !s.Cfg.InitialCorruptCheck { - return nil - } +func (cm *corruptionMonitor) InitialCheck() error { - lg := s.Logger() - - lg.Info( + cm.lg.Info( "starting initial corruption check", - zap.String("local-member-id", s.MemberId().String()), - zap.Duration("timeout", s.Cfg.ReqTimeout()), + zap.String("local-member-id", cm.hasher.MemberId().String()), + zap.Duration("timeout", cm.hasher.ReqTimeout()), ) - h, rev, crev, err := s.kv.HashByRev(0) + h, rev, crev, err := cm.hasher.HashByRev(0) if err != nil { - return fmt.Errorf("%s failed to fetch hash (%v)", s.MemberId(), err) + return fmt.Errorf("%s failed to fetch hash (%v)", cm.hasher.MemberId(), err) } - peers := s.getPeerHashKVs(rev) + peers := cm.hasher.PeerHashByRev(rev) mismatch := 0 for _, p := range peers { if p.resp != nil { peerID := types.ID(p.resp.Header.MemberId) fields := []zap.Field{ - zap.String("local-member-id", s.MemberId().String()), + zap.String("local-member-id", cm.hasher.MemberId().String()), zap.Int64("local-member-revision", rev), zap.Int64("local-member-compact-revision", crev), zap.Uint32("local-member-hash", h), @@ -71,10 +112,10 @@ func (s *EtcdServer) CheckInitialHashKV() error { if h != p.resp.Hash { if crev == p.resp.CompactRevision { - lg.Warn("found different hash values from remote peer", fields...) + cm.lg.Warn("found different hash values from remote peer", fields...) mismatch++ } else { - lg.Warn("found different compact revision values from remote peer", fields...) + cm.lg.Warn("found different compact revision values from remote peer", fields...) } } @@ -84,9 +125,9 @@ func (s *EtcdServer) CheckInitialHashKV() error { if p.err != nil { switch p.err { case rpctypes.ErrFutureRev: - lg.Warn( + cm.lg.Warn( "cannot fetch hash from slow remote peer", - zap.String("local-member-id", s.MemberId().String()), + zap.String("local-member-id", cm.hasher.MemberId().String()), zap.Int64("local-member-revision", rev), zap.Int64("local-member-compact-revision", crev), zap.Uint32("local-member-hash", h), @@ -95,9 +136,9 @@ func (s *EtcdServer) CheckInitialHashKV() error { zap.Error(err), ) case rpctypes.ErrCompacted: - lg.Warn( + cm.lg.Warn( "cannot fetch hash from remote peer; local member is behind", - zap.String("local-member-id", s.MemberId().String()), + zap.String("local-member-id", cm.hasher.MemberId().String()), zap.Int64("local-member-revision", rev), zap.Int64("local-member-compact-revision", crev), zap.Uint32("local-member-hash", h), @@ -109,61 +150,31 @@ func (s *EtcdServer) CheckInitialHashKV() error { } } if mismatch > 0 { - return fmt.Errorf("%s found data inconsistency with peers", s.MemberId()) + return fmt.Errorf("%s found data inconsistency with peers", cm.hasher.MemberId()) } - lg.Info( + cm.lg.Info( "initial corruption checking passed; no corruption", - zap.String("local-member-id", s.MemberId().String()), + zap.String("local-member-id", cm.hasher.MemberId().String()), ) return nil } -func (s *EtcdServer) monitorKVHash() { - t := s.Cfg.CorruptCheckTime - if t == 0 { - return - } - - lg := s.Logger() - lg.Info( - "enabled corruption checking", - zap.String("local-member-id", s.MemberId().String()), - zap.Duration("interval", t), - ) - - for { - select { - case <-s.stopping: - return - case <-time.After(t): - } - if !s.isLeader() { - continue - } - if err := s.checkHashKV(); err != nil { - lg.Warn("failed to check hash KV", zap.Error(err)) - } - } -} - -func (s *EtcdServer) checkHashKV() error { - lg := s.Logger() - - h, rev, crev, err := s.kv.HashByRev(0) +func (cm *corruptionMonitor) periodicCheck() error { + h, rev, crev, err := cm.hasher.HashByRev(0) if err != nil { return err } - peers := s.getPeerHashKVs(rev) + peers := cm.hasher.PeerHashByRev(rev) - ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout()) - err = s.linearizableReadNotify(ctx) + ctx, cancel := context.WithTimeout(context.Background(), cm.hasher.ReqTimeout()) + err = cm.hasher.LinearizableReadNotify(ctx) cancel() if err != nil { return err } - h2, rev2, crev2, err := s.kv.HashByRev(0) + h2, rev2, crev2, err := cm.hasher.HashByRev(0) if err != nil { return err } @@ -174,11 +185,11 @@ func (s *EtcdServer) checkHashKV() error { return } alarmed = true - s.triggerCorruptAlarm(id) + cm.hasher.TriggerCorruptAlarm(id) } if h2 != h && rev2 == rev && crev == crev2 { - lg.Warn( + cm.lg.Warn( "found hash mismatch", zap.Int64("revision-1", rev), zap.Int64("compact-revision-1", crev), @@ -187,7 +198,7 @@ func (s *EtcdServer) checkHashKV() error { zap.Int64("compact-revision-2", crev2), zap.Uint32("hash-2", h2), ) - mismatch(uint64(s.MemberId())) + mismatch(uint64(cm.hasher.MemberId())) } checkedCount := 0 @@ -200,7 +211,7 @@ func (s *EtcdServer) checkHashKV() error { // leader expects follower's latest revision less than or equal to leader's if p.resp.Header.Revision > rev2 { - lg.Warn( + cm.lg.Warn( "revision from follower must be less than or equal to leader's", zap.Int64("leader-revision", rev2), zap.Int64("follower-revision", p.resp.Header.Revision), @@ -211,7 +222,7 @@ func (s *EtcdServer) checkHashKV() error { // leader expects follower's latest compact revision less than or equal to leader's if p.resp.CompactRevision > crev2 { - lg.Warn( + cm.lg.Warn( "compact revision from follower must be less than or equal to leader's", zap.Int64("leader-compact-revision", crev2), zap.Int64("follower-compact-revision", p.resp.CompactRevision), @@ -222,7 +233,7 @@ func (s *EtcdServer) checkHashKV() error { // follower's compact revision is leader's old one, then hashes must match if p.resp.CompactRevision == crev && p.resp.Hash != h { - lg.Warn( + cm.lg.Warn( "same compact revision then hashes must match", zap.Int64("leader-compact-revision", crev2), zap.Uint32("leader-hash", h), @@ -233,7 +244,7 @@ func (s *EtcdServer) checkHashKV() error { mismatch(id) } } - lg.Info("finished peer corruption check", zap.Int("number-of-peers-checked", checkedCount)) + cm.lg.Info("finished peer corruption check", zap.Int("number-of-peers-checked", checkedCount)) return nil } diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index ce5a65fda..18d3a0fa1 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -2187,6 +2187,34 @@ func (s *EtcdServer) monitorStorageVersion() { } } +func (s *EtcdServer) monitorKVHash() { + t := s.Cfg.CorruptCheckTime + if t == 0 { + return + } + + lg := s.Logger() + lg.Info( + "enabled corruption checking", + zap.String("local-member-id", s.MemberId().String()), + zap.Duration("interval", t), + ) + monitor := NewCorruptionMonitor(lg, s) + for { + select { + case <-s.stopping: + return + case <-time.After(t): + } + if !s.isLeader() { + continue + } + if err := monitor.periodicCheck(); err != nil { + lg.Warn("failed to check hash KV", zap.Error(err)) + } + } +} + func (s *EtcdServer) updateClusterVersionV2(ver string) { lg := s.Logger() diff --git a/server/storage/mvcc/kv.go b/server/storage/mvcc/kv.go index 10c4821b1..7fe9e0921 100644 --- a/server/storage/mvcc/kv.go +++ b/server/storage/mvcc/kv.go @@ -112,6 +112,7 @@ const ( type KV interface { ReadView WriteView + Hasher // Read creates a read transaction. Read(mode ReadTxMode, trace *traceutil.Trace) TxnRead @@ -119,12 +120,6 @@ type KV interface { // Write creates a write transaction. Write(trace *traceutil.Trace) TxnWrite - // Hash computes the hash of the KV's backend. - Hash() (hash uint32, revision int64, err error) - - // HashByRev computes the hash of all MVCC revisions up to a given revision. - HashByRev(rev int64) (hash uint32, revision int64, compactRev int64, err error) - // Compact frees all superseded keys with revisions less than rev. Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) @@ -136,6 +131,14 @@ type KV interface { Close() error } +type Hasher interface { + // Hash computes the hash of the KV's backend. + Hash() (hash uint32, revision int64, err error) + + // HashByRev computes the hash of all MVCC revisions up to a given revision. + HashByRev(rev int64) (hash uint32, revision int64, compactRev int64, err error) +} + // WatchableKV is a KV that can be watched. type WatchableKV interface { KV