diff --git a/server/config/config.go b/server/config/config.go index 9ecfc1463..5206b3dc5 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -147,8 +147,10 @@ type ServerConfig struct { // InitialCorruptCheck is true to check data corruption on boot // before serving any peer/client traffic. - InitialCorruptCheck bool - CorruptCheckTime time.Duration + InitialCorruptCheck bool + CorruptCheckTime time.Duration + CompactHashCheckEnabled bool + CompactHashCheckTime time.Duration // PreVote is true to enable Raft Pre-Vote. PreVote bool diff --git a/server/embed/config.go b/server/embed/config.go index 4e1f6a19c..af4e25241 100644 --- a/server/embed/config.go +++ b/server/embed/config.go @@ -320,8 +320,11 @@ type Config struct { // AuthTokenTTL in seconds of the simple token AuthTokenTTL uint `json:"auth-token-ttl"` - ExperimentalInitialCorruptCheck bool `json:"experimental-initial-corrupt-check"` - ExperimentalCorruptCheckTime time.Duration `json:"experimental-corrupt-check-time"` + ExperimentalInitialCorruptCheck bool `json:"experimental-initial-corrupt-check"` + ExperimentalCorruptCheckTime time.Duration `json:"experimental-corrupt-check-time"` + ExperimentalCompactHashCheckEnabled bool `json:"experimental-compact-hash-check-enabled"` + ExperimentalCompactHashCheckTime time.Duration `json:"experimental-compact-hash-check-time"` + // ExperimentalEnableLeaseCheckpoint enables leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change. ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"` // ExperimentalEnableLeaseCheckpointPersist enables persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled. @@ -521,6 +524,9 @@ func NewConfig() *Config { ExperimentalTxnModeWriteWithSharedBuffer: true, ExperimentalMaxLearners: membership.DefaultMaxLearners, + ExperimentalCompactHashCheckEnabled: false, + ExperimentalCompactHashCheckTime: time.Minute, + V2Deprecation: config.V2_DEPR_DEFAULT, DiscoveryCfg: v3discovery.DiscoveryConfig{ @@ -759,6 +765,10 @@ func (cfg *Config) Validate() error { return fmt.Errorf("setting experimental-enable-lease-checkpoint-persist requires experimental-enable-lease-checkpoint") } + if cfg.ExperimentalCompactHashCheckTime <= 0 { + return fmt.Errorf("--experimental-compact-hash-check-time must be >0 (set to %v)", cfg.ExperimentalCompactHashCheckTime) + } + return nil } diff --git a/server/embed/etcd.go b/server/embed/etcd.go index 564ad5e7a..3d94b63be 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -202,6 +202,8 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { HostWhitelist: cfg.HostWhitelist, InitialCorruptCheck: cfg.ExperimentalInitialCorruptCheck, CorruptCheckTime: cfg.ExperimentalCorruptCheckTime, + CompactHashCheckEnabled: cfg.ExperimentalCompactHashCheckEnabled, + CompactHashCheckTime: cfg.ExperimentalCompactHashCheckTime, PreVote: cfg.PreVote, Logger: cfg.logger, ForceNewCluster: cfg.ForceNewCluster, @@ -252,7 +254,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { // newly started member ("memberInitialized==false") // does not need corruption check if memberInitialized && srvcfg.InitialCorruptCheck { - if err = etcdserver.NewCorruptionMonitor(e.cfg.logger, e.Server).InitialCheck(); err != nil { + if err = e.Server.CorruptionChecker().InitialCheck(); err != nil { // set "EtcdServer" to nil, so that it does not block on "EtcdServer.Close()" // (nothing to close since rafthttp transports have not been started) @@ -344,6 +346,8 @@ func print(lg *zap.Logger, ec Config, sc config.ServerConfig, memberInitialized zap.Bool("pre-vote", sc.PreVote), zap.Bool("initial-corrupt-check", sc.InitialCorruptCheck), zap.String("corrupt-check-time-interval", sc.CorruptCheckTime.String()), + zap.Bool("compact-check-time-enabled", sc.CompactHashCheckEnabled), + zap.Duration("compact-check-time-interval", sc.CompactHashCheckTime), zap.String("auto-compaction-mode", sc.AutoCompactionMode), zap.Duration("auto-compaction-retention", sc.AutoCompactionRetention), zap.String("auto-compaction-interval", sc.AutoCompactionRetention.String()), diff --git a/server/etcdmain/config.go b/server/etcdmain/config.go index 28f81e33e..b14191a95 100644 --- a/server/etcdmain/config.go +++ b/server/etcdmain/config.go @@ -259,6 +259,8 @@ func newConfig() *config { // experimental fs.BoolVar(&cfg.ec.ExperimentalInitialCorruptCheck, "experimental-initial-corrupt-check", cfg.ec.ExperimentalInitialCorruptCheck, "Enable to check data corruption before serving any client/peer traffic.") fs.DurationVar(&cfg.ec.ExperimentalCorruptCheckTime, "experimental-corrupt-check-time", cfg.ec.ExperimentalCorruptCheckTime, "Duration of time between cluster corruption check passes.") + fs.BoolVar(&cfg.ec.ExperimentalCompactHashCheckEnabled, "experimental-compact-hash-check-enabled", cfg.ec.ExperimentalCompactHashCheckEnabled, "Enable leader to periodically check followers compaction hashes.") + fs.DurationVar(&cfg.ec.ExperimentalCompactHashCheckTime, "experimental-compact-hash-check-time", cfg.ec.ExperimentalCompactHashCheckTime, "Duration of time between leader checks followers compaction hashes.") fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change.") // TODO: delete in v3.7 diff --git a/server/etcdserver/corrupt.go b/server/etcdserver/corrupt.go index 152cb0a91..8ccde507c 100644 --- a/server/etcdserver/corrupt.go +++ b/server/etcdserver/corrupt.go @@ -21,7 +21,9 @@ import ( "fmt" "io" "net/http" + "sort" "strings" + "sync" "time" pb "go.etcd.io/etcd/api/v3/etcdserverpb" @@ -32,10 +34,19 @@ import ( "go.uber.org/zap" ) -type corruptionMonitor struct { +type CorruptionChecker interface { + InitialCheck() error + PeriodicCheck() error + CompactHashCheck() +} + +type corruptionChecker struct { lg *zap.Logger hasher Hasher + + mux sync.RWMutex + latestRevisionChecked int64 } type Hasher interface { @@ -47,10 +58,10 @@ type Hasher interface { TriggerCorruptAlarm(uint64) } -func NewCorruptionMonitor(lg *zap.Logger, s *EtcdServer) *corruptionMonitor { - return &corruptionMonitor{ +func newCorruptionChecker(lg *zap.Logger, s *EtcdServer, storage mvcc.HashStorage) *corruptionChecker { + return &corruptionChecker{ lg: lg, - hasher: hasherAdapter{s, s.KV().HashStorage()}, + hasher: hasherAdapter{s, storage}, } } @@ -74,7 +85,7 @@ func (h hasherAdapter) TriggerCorruptAlarm(memberID uint64) { // InitialCheck compares initial hash values with its peers // before serving any peer/client traffic. Only mismatch when hashes // are different at requested revision, with same compact revision. -func (cm *corruptionMonitor) InitialCheck() error { +func (cm *corruptionChecker) InitialCheck() error { cm.lg.Info( "starting initial corruption check", @@ -153,7 +164,7 @@ func (cm *corruptionMonitor) InitialCheck() error { return nil } -func (cm *corruptionMonitor) periodicCheck() error { +func (cm *corruptionChecker) PeriodicCheck() error { h, rev, err := cm.hasher.HashByRev(0) if err != nil { return err @@ -241,6 +252,84 @@ func (cm *corruptionMonitor) periodicCheck() error { return nil } +func (cm *corruptionChecker) CompactHashCheck() { + cm.lg.Info("starting compact hash check", + zap.String("local-member-id", cm.hasher.MemberId().String()), + zap.Duration("timeout", cm.hasher.ReqTimeout()), + ) + hashes := cm.uncheckedRevisions() + // Assume that revisions are ordered from largest to smallest + for i, hash := range hashes { + peers := cm.hasher.PeerHashByRev(hash.Revision) + if len(peers) == 0 { + continue + } + peersChecked := 0 + for _, p := range peers { + if p.resp == nil || p.resp.CompactRevision != hash.CompactRevision { + continue + } + + // follower's compact revision is leader's old one, then hashes must match + if p.resp.Hash != hash.Hash { + cm.hasher.TriggerCorruptAlarm(uint64(p.id)) + cm.lg.Error("failed compaction hash check", + zap.Int64("revision", hash.Revision), + zap.Int64("leader-compact-revision", hash.CompactRevision), + zap.Uint32("leader-hash", hash.Hash), + zap.Int64("follower-compact-revision", p.resp.CompactRevision), + zap.Uint32("follower-hash", p.resp.Hash), + zap.String("follower-peer-id", p.id.String()), + ) + return + } + peersChecked++ + cm.lg.Info("successfully checked hash on follower", + zap.Int64("revision", hash.Revision), + zap.String("peer-id", p.id.String()), + ) + } + if len(peers) == peersChecked { + cm.lg.Info("successfully checked hash on whole cluster", + zap.Int("number-of-peers-checked", peersChecked), + zap.Int64("revision", hash.Revision), + ) + cm.mux.Lock() + if hash.Revision > cm.latestRevisionChecked { + cm.latestRevisionChecked = hash.Revision + } + cm.mux.Unlock() + cm.lg.Info("finished compaction hash check", zap.Int("number-of-hashes-checked", i+1)) + return + } + cm.lg.Warn("skipped revision in compaction hash check; was not able to check all peers", + zap.Int("number-of-peers-checked", peersChecked), + zap.Int("number-of-peers", len(peers)), + zap.Int64("revision", hash.Revision), + ) + } + cm.lg.Info("finished compaction hash check", zap.Int("number-of-hashes-checked", len(hashes))) + return +} + +func (cm *corruptionChecker) uncheckedRevisions() []mvcc.KeyValueHash { + cm.mux.RLock() + lastRevisionChecked := cm.latestRevisionChecked + cm.mux.RUnlock() + + hashes := cm.hasher.Hashes() + // Sort in descending order + sort.Slice(hashes, func(i, j int) bool { + return hashes[i].Revision > hashes[j].Revision + }) + for i, hash := range hashes { + if hash.Revision <= lastRevisionChecked { + return hashes[:i] + } + } + return hashes +} + func (s *EtcdServer) triggerCorruptAlarm(id uint64) { a := &pb.AlarmRequest{ MemberID: id, diff --git a/server/etcdserver/corrupt_test.go b/server/etcdserver/corrupt_test.go index d2b976f74..8a125db6e 100644 --- a/server/etcdserver/corrupt_test.go +++ b/server/etcdserver/corrupt_test.go @@ -88,7 +88,7 @@ func TestInitialCheck(t *testing.T) { } for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { - monitor := corruptionMonitor{ + monitor := corruptionChecker{ lg: zaptest.NewLogger(t), hasher: &tc.hasher, } @@ -205,11 +205,11 @@ func TestPeriodicCheck(t *testing.T) { } for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { - monitor := corruptionMonitor{ + monitor := corruptionChecker{ lg: zaptest.NewLogger(t), hasher: &tc.hasher, } - err := monitor.periodicCheck() + err := monitor.PeriodicCheck() if gotError := err != nil; gotError != tc.expectError { t.Errorf("Unexpected error, got: %v, expected?: %v", err, tc.expectError) } @@ -221,11 +221,101 @@ func TestPeriodicCheck(t *testing.T) { } } +func TestCompactHashCheck(t *testing.T) { + tcs := []struct { + name string + hasher fakeHasher + lastRevisionChecked int64 + + expectError bool + expectCorrupt bool + expectActions []string + expectLastRevisionChecked int64 + }{ + { + name: "No hashes", + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()"}, + }, + { + name: "No peers, check new checked from largest to smallest", + hasher: fakeHasher{ + hashes: []mvcc.KeyValueHash{{Revision: 1}, {Revision: 2}, {Revision: 3}, {Revision: 4}}, + }, + lastRevisionChecked: 2, + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(4)", "PeerHashByRev(3)"}, + expectLastRevisionChecked: 2, + }, + { + name: "Peer error", + hasher: fakeHasher{ + hashes: []mvcc.KeyValueHash{{Revision: 1}, {Revision: 2}}, + peerHashes: []*peerHashKVResp{{err: fmt.Errorf("failed getting hash")}}, + }, + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "PeerHashByRev(1)"}, + }, + { + name: "Peer returned different compaction revision is skipped", + hasher: fakeHasher{ + hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1}, {Revision: 2, CompactRevision: 2}}, + peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{CompactRevision: 3}}}, + }, + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "PeerHashByRev(1)"}, + }, + { + name: "Peer returned same compaction revision but different hash triggers alarm", + hasher: fakeHasher{ + hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}, {Revision: 2, CompactRevision: 1, Hash: 2}}, + peerHashes: []*peerHashKVResp{{peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}}}, + }, + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "TriggerCorruptAlarm(42)"}, + expectCorrupt: true, + }, + { + name: "Peer returned same hash bumps last revision checked", + hasher: fakeHasher{ + hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}, {Revision: 2, CompactRevision: 1, Hash: 1}}, + peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{MemberId: 42}, CompactRevision: 1, Hash: 1}}}, + }, + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)"}, + expectLastRevisionChecked: 2, + }, + { + name: "Only one peer succeeded check", + hasher: fakeHasher{ + hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}}, + peerHashes: []*peerHashKVResp{ + {resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{MemberId: 42}, CompactRevision: 1, Hash: 1}}, + {err: fmt.Errorf("failed getting hash")}, + }, + }, + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(1)"}, + }, + } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + monitor := corruptionChecker{ + latestRevisionChecked: tc.lastRevisionChecked, + lg: zaptest.NewLogger(t), + hasher: &tc.hasher, + } + monitor.CompactHashCheck() + if tc.hasher.alarmTriggered != tc.expectCorrupt { + t.Errorf("Unexpected corrupt triggered, got: %v, expected?: %v", tc.hasher.alarmTriggered, tc.expectCorrupt) + } + if tc.expectLastRevisionChecked != monitor.latestRevisionChecked { + t.Errorf("Unexpected last revision checked, got: %v, expected?: %v", monitor.latestRevisionChecked, tc.expectLastRevisionChecked) + } + assert.Equal(t, tc.expectActions, tc.hasher.actions) + }) + } +} + type fakeHasher struct { peerHashes []*peerHashKVResp hashByRevIndex int hashByRevResponses []hashByRev linearizableReadNotify error + hashes []mvcc.KeyValueHash alarmTriggered bool actions []string @@ -251,8 +341,14 @@ func (f *fakeHasher) HashByRev(rev int64) (hash mvcc.KeyValueHash, revision int6 return hashByRev.hash, hashByRev.revision, hashByRev.err } -func (f *fakeHasher) Store(valueHash mvcc.KeyValueHash) { - panic("not implemented") +func (f *fakeHasher) Store(hash mvcc.KeyValueHash) { + f.actions = append(f.actions, fmt.Sprintf("Store(%v)", hash)) + f.hashes = append(f.hashes, hash) +} + +func (f *fakeHasher) Hashes() []mvcc.KeyValueHash { + f.actions = append(f.actions, "Hashes()") + return f.hashes } func (f *fakeHasher) ReqTimeout() time.Duration { diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index e98bb3259..99a2159d9 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -295,7 +295,8 @@ type EtcdServer struct { *AccessController // forceSnapshot can force snapshot be triggered after apply, independent of the snapshotCount. // Should only be set within apply code path. Used to force snapshot after cluster version downgrade. - forceSnapshot bool + forceSnapshot bool + corruptionChecker CorruptionChecker } // NewServer creates a new EtcdServer from the supplied configuration. The @@ -371,6 +372,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { CompactionSleepInterval: cfg.CompactionSleepInterval, } srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig) + srv.corruptionChecker = newCorruptionChecker(cfg.Logger, srv, srv.kv.HashStorage()) srv.authStore = auth.NewAuthStore(srv.Logger(), schema.NewAuthBackend(srv.Logger(), srv.be), tp, int(cfg.BcryptCost)) @@ -530,6 +532,7 @@ func (s *EtcdServer) Start() { s.GoAttach(s.monitorStorageVersion) s.GoAttach(s.linearizableReadLoop) s.GoAttach(s.monitorKVHash) + s.GoAttach(s.monitorCompactHash) s.GoAttach(s.monitorDowngrade) } @@ -2199,7 +2202,6 @@ func (s *EtcdServer) monitorKVHash() { zap.String("local-member-id", s.MemberId().String()), zap.Duration("interval", t), ) - monitor := NewCorruptionMonitor(lg, s) for { select { case <-s.stopping: @@ -2209,12 +2211,30 @@ func (s *EtcdServer) monitorKVHash() { if !s.isLeader() { continue } - if err := monitor.periodicCheck(); err != nil { + if err := s.corruptionChecker.PeriodicCheck(); err != nil { lg.Warn("failed to check hash KV", zap.Error(err)) } } } +func (s *EtcdServer) monitorCompactHash() { + if !s.Cfg.CompactHashCheckEnabled { + return + } + t := s.Cfg.CompactHashCheckTime + for { + select { + case <-time.After(t): + case <-s.stopping: + return + } + if !s.isLeader() { + continue + } + s.corruptionChecker.CompactHashCheck() + } +} + func (s *EtcdServer) updateClusterVersionV2(ver string) { lg := s.Logger() @@ -2416,3 +2436,7 @@ func (s *EtcdServer) getTxPostLockInsideApplyHook() func() { } } } + +func (s *EtcdServer) CorruptionChecker() CorruptionChecker { + return s.corruptionChecker +} diff --git a/server/storage/mvcc/hash.go b/server/storage/mvcc/hash.go index 696ddd216..3d30aa42e 100644 --- a/server/storage/mvcc/hash.go +++ b/server/storage/mvcc/hash.go @@ -93,6 +93,9 @@ type HashStorage interface { // Store adds hash value in local cache, allowing it can be returned by HashByRev. Store(valueHash KeyValueHash) + + // Hashes returns list of up to `hashStorageMaxSize` newest previously stored hashes. + Hashes() []KeyValueHash } type hashStorage struct { @@ -146,3 +149,14 @@ func (s *hashStorage) Store(hash KeyValueHash) { s.hashes = s.hashes[len(s.hashes)-hashStorageMaxSize:] } } + +func (s *hashStorage) Hashes() []KeyValueHash { + s.hashMu.RLock() + // Copy out hashes under lock just to be safe + hashes := make([]KeyValueHash, 0, len(s.hashes)) + for _, hash := range s.hashes { + hashes = append(hashes, hash) + } + s.hashMu.RUnlock() + return hashes +} diff --git a/server/storage/mvcc/testutil/hash.go b/server/storage/mvcc/testutil/hash.go index fbd37f2d6..7d6d1354b 100644 --- a/server/storage/mvcc/testutil/hash.go +++ b/server/storage/mvcc/testutil/hash.go @@ -16,10 +16,14 @@ package testutil import ( "context" + "errors" "fmt" + "os" "testing" "github.com/stretchr/testify/assert" + "go.etcd.io/bbolt" + "go.etcd.io/etcd/api/v3/mvccpb" ) const ( @@ -103,3 +107,40 @@ func PickKey(i int64) string { panic("Can't count") } } + +func CorruptBBolt(fpath string) error { + db, derr := bbolt.Open(fpath, os.ModePerm, &bbolt.Options{}) + if derr != nil { + return derr + } + defer db.Close() + + return db.Update(func(tx *bbolt.Tx) error { + b := tx.Bucket([]byte("key")) + if b == nil { + return errors.New("got nil bucket for 'key'") + } + keys, vals := [][]byte{}, [][]byte{} + c := b.Cursor() + for k, v := c.First(); k != nil; k, v = c.Next() { + keys = append(keys, k) + var kv mvccpb.KeyValue + if uerr := kv.Unmarshal(v); uerr != nil { + return uerr + } + kv.Key[0]++ + kv.Value[0]++ + v2, v2err := kv.Marshal() + if v2err != nil { + return v2err + } + vals = append(vals, v2) + } + for i := range keys { + if perr := b.Put(keys[i], vals[i]); perr != nil { + return perr + } + } + return nil + }) +} diff --git a/tests/e2e/corrupt_test.go b/tests/e2e/corrupt_test.go new file mode 100644 index 000000000..ae8c32350 --- /dev/null +++ b/tests/e2e/corrupt_test.go @@ -0,0 +1,182 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/server/v3/storage/datadir" + "go.etcd.io/etcd/server/v3/storage/mvcc/testutil" + "go.etcd.io/etcd/tests/v3/framework/config" + "go.etcd.io/etcd/tests/v3/framework/e2e" +) + +func TestEtcdCorruptHash(t *testing.T) { + // oldenv := os.Getenv("EXPECT_DEBUG") + // defer os.Setenv("EXPECT_DEBUG", oldenv) + // os.Setenv("EXPECT_DEBUG", "1") + + cfg := e2e.NewConfigNoTLS() + + // trigger snapshot so that restart member can load peers from disk + cfg.SnapshotCount = 3 + + testCtl(t, corruptTest, withQuorum(), + withCfg(*cfg), + withInitialCorruptCheck(), + withCorruptFunc(testutil.CorruptBBolt), + ) +} + +func corruptTest(cx ctlCtx) { + cx.t.Log("putting 10 keys...") + for i := 0; i < 10; i++ { + if err := ctlV3Put(cx, fmt.Sprintf("foo%05d", i), fmt.Sprintf("v%05d", i), ""); err != nil { + if cx.dialTimeout > 0 && !isGRPCTimedout(err) { + cx.t.Fatalf("putTest ctlV3Put error (%v)", err) + } + } + } + // enough time for all nodes sync on the same data + cx.t.Log("sleeping 3sec to let nodes sync...") + time.Sleep(3 * time.Second) + + cx.t.Log("connecting clientv3...") + eps := cx.epc.EndpointsV3() + cli1, err := clientv3.New(clientv3.Config{Endpoints: []string{eps[1]}, DialTimeout: 3 * time.Second}) + if err != nil { + cx.t.Fatal(err) + } + defer cli1.Close() + + sresp, err := cli1.Status(context.TODO(), eps[0]) + cx.t.Logf("checked status sresp:%v err:%v", sresp, err) + if err != nil { + cx.t.Fatal(err) + } + id0 := sresp.Header.GetMemberId() + + cx.t.Log("stopping etcd[0]...") + cx.epc.Procs[0].Stop() + + // corrupting first member by modifying backend offline. + fp := datadir.ToBackendFileName(cx.epc.Procs[0].Config().DataDirPath) + cx.t.Logf("corrupting backend: %v", fp) + if err = cx.corruptFunc(fp); err != nil { + cx.t.Fatal(err) + } + + cx.t.Log("restarting etcd[0]") + ep := cx.epc.Procs[0] + proc, err := e2e.SpawnCmd(append([]string{ep.Config().ExecPath}, ep.Config().Args...), cx.envMap) + if err != nil { + cx.t.Fatal(err) + } + defer proc.Stop() + + cx.t.Log("waiting for etcd[0] failure...") + // restarting corrupted member should fail + e2e.WaitReadyExpectProc(proc, []string{fmt.Sprintf("etcdmain: %016x found data inconsistency with peers", id0)}) +} + +func TestPeriodicCheckDetectsCorruption(t *testing.T) { + checkTime := time.Second + e2e.BeforeTest(t) + epc, err := e2e.NewEtcdProcessCluster(t, &e2e.EtcdProcessClusterConfig{ + ClusterSize: 3, + KeepDataDir: true, + CorruptCheckTime: time.Second, + }) + if err != nil { + t.Fatalf("could not start etcd process cluster (%v)", err) + } + t.Cleanup(func() { + if errC := epc.Close(); errC != nil { + t.Fatalf("error closing etcd processes (%v)", errC) + } + }) + + cc := e2e.NewEtcdctl(epc.Cfg, epc.EndpointsV3()) + + for i := 0; i < 10; i++ { + err := cc.Put(testutil.PickKey(int64(i)), fmt.Sprint(i), config.PutOptions{}) + assert.NoError(t, err, "error on put") + } + + epc.Procs[0].Stop() + err = testutil.CorruptBBolt(datadir.ToBackendFileName(epc.Procs[0].Config().DataDirPath)) + assert.NoError(t, err) + + err = epc.Procs[0].Restart() + assert.NoError(t, err) + time.Sleep(checkTime * 11 / 10) + alarmResponse, err := cc.AlarmList() + assert.NoError(t, err, "error on alarm list") + // TODO: Investigate why MemberID is 0? + assert.Equal(t, []*etcdserverpb.AlarmMember{{Alarm: etcdserverpb.AlarmType_CORRUPT, MemberID: 0}}, alarmResponse.Alarms) +} + +func TestCompactHashCheckDetectCorruption(t *testing.T) { + checkTime := time.Second + e2e.BeforeTest(t) + epc, err := e2e.NewEtcdProcessCluster(t, &e2e.EtcdProcessClusterConfig{ + ClusterSize: 3, + KeepDataDir: true, + CompactHashCheckEnabled: true, + CompactHashCheckTime: checkTime, + }) + if err != nil { + t.Fatalf("could not start etcd process cluster (%v)", err) + } + t.Cleanup(func() { + if errC := epc.Close(); errC != nil { + t.Fatalf("error closing etcd processes (%v)", errC) + } + }) + + cc := e2e.NewEtcdctl(epc.Cfg, epc.EndpointsV3()) + + for i := 0; i < 10; i++ { + err := cc.Put(testutil.PickKey(int64(i)), fmt.Sprint(i), config.PutOptions{}) + assert.NoError(t, err, "error on put") + } + members, err := cc.MemberList() + assert.NoError(t, err, "error on member list") + var memberID uint64 + for _, m := range members.Members { + if m.Name == epc.Procs[0].Config().Name { + memberID = m.ID + } + } + + epc.Procs[0].Stop() + err = testutil.CorruptBBolt(datadir.ToBackendFileName(epc.Procs[0].Config().DataDirPath)) + assert.NoError(t, err) + + err = epc.Procs[0].Restart() + assert.NoError(t, err) + _, err = cc.Compact(5, config.CompactOption{}) + assert.NoError(t, err) + time.Sleep(checkTime * 11 / 10) + alarmResponse, err := cc.AlarmList() + assert.NoError(t, err, "error on alarm list") + assert.Equal(t, []*etcdserverpb.AlarmMember{{Alarm: etcdserverpb.AlarmType_CORRUPT, MemberID: memberID}}, alarmResponse.Alarms) +} diff --git a/tests/e2e/etcd_corrupt_test.go b/tests/e2e/etcd_corrupt_test.go deleted file mode 100644 index dc34702b7..000000000 --- a/tests/e2e/etcd_corrupt_test.go +++ /dev/null @@ -1,137 +0,0 @@ -// Copyright 2017 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package e2e - -import ( - "context" - "errors" - "fmt" - "os" - "testing" - "time" - - bolt "go.etcd.io/bbolt" - "go.etcd.io/etcd/api/v3/mvccpb" - "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/server/v3/storage/datadir" - "go.etcd.io/etcd/tests/v3/framework/e2e" -) - -// TODO: test with embedded etcd in integration package - -func TestEtcdCorruptHash(t *testing.T) { - // oldenv := os.Getenv("EXPECT_DEBUG") - // defer os.Setenv("EXPECT_DEBUG", oldenv) - // os.Setenv("EXPECT_DEBUG", "1") - - cfg := e2e.NewConfigNoTLS() - - // trigger snapshot so that restart member can load peers from disk - cfg.SnapshotCount = 3 - - testCtl(t, corruptTest, withQuorum(), - withCfg(*cfg), - withInitialCorruptCheck(), - withCorruptFunc(corruptHash), - ) -} - -func corruptTest(cx ctlCtx) { - cx.t.Log("putting 10 keys...") - for i := 0; i < 10; i++ { - if err := ctlV3Put(cx, fmt.Sprintf("foo%05d", i), fmt.Sprintf("v%05d", i), ""); err != nil { - if cx.dialTimeout > 0 && !isGRPCTimedout(err) { - cx.t.Fatalf("putTest ctlV3Put error (%v)", err) - } - } - } - // enough time for all nodes sync on the same data - cx.t.Log("sleeping 3sec to let nodes sync...") - time.Sleep(3 * time.Second) - - cx.t.Log("connecting clientv3...") - eps := cx.epc.EndpointsV3() - cli1, err := clientv3.New(clientv3.Config{Endpoints: []string{eps[1]}, DialTimeout: 3 * time.Second}) - if err != nil { - cx.t.Fatal(err) - } - defer cli1.Close() - - sresp, err := cli1.Status(context.TODO(), eps[0]) - cx.t.Logf("checked status sresp:%v err:%v", sresp, err) - if err != nil { - cx.t.Fatal(err) - } - id0 := sresp.Header.GetMemberId() - - cx.t.Log("stopping etcd[0]...") - cx.epc.Procs[0].Stop() - - // corrupting first member by modifying backend offline. - fp := datadir.ToBackendFileName(cx.epc.Procs[0].Config().DataDirPath) - cx.t.Logf("corrupting backend: %v", fp) - if err = cx.corruptFunc(fp); err != nil { - cx.t.Fatal(err) - } - - cx.t.Log("restarting etcd[0]") - ep := cx.epc.Procs[0] - proc, err := e2e.SpawnCmd(append([]string{ep.Config().ExecPath}, ep.Config().Args...), cx.envMap) - if err != nil { - cx.t.Fatal(err) - } - defer proc.Stop() - - cx.t.Log("waiting for etcd[0] failure...") - // restarting corrupted member should fail - e2e.WaitReadyExpectProc(proc, []string{fmt.Sprintf("etcdmain: %016x found data inconsistency with peers", id0)}) -} - -func corruptHash(fpath string) error { - db, derr := bolt.Open(fpath, os.ModePerm, &bolt.Options{}) - if derr != nil { - return derr - } - defer db.Close() - - return db.Update(func(tx *bolt.Tx) error { - b := tx.Bucket([]byte("key")) - if b == nil { - return errors.New("got nil bucket for 'key'") - } - keys, vals := [][]byte{}, [][]byte{} - c := b.Cursor() - for k, v := c.First(); k != nil; k, v = c.Next() { - keys = append(keys, k) - var kv mvccpb.KeyValue - if uerr := kv.Unmarshal(v); uerr != nil { - return uerr - } - kv.Key[0]++ - kv.Value[0]++ - v2, v2err := kv.Marshal() - if v2err != nil { - return v2err - } - vals = append(vals, v2) - } - for i := range keys { - if perr := b.Put(keys[i], vals[i]); perr != nil { - return perr - } - } - return nil - }) -} diff --git a/tests/framework/e2e/cluster.go b/tests/framework/e2e/cluster.go index fece5f5b0..411bc34b1 100644 --- a/tests/framework/e2e/cluster.go +++ b/tests/framework/e2e/cluster.go @@ -177,7 +177,10 @@ type EtcdProcessClusterConfig struct { DiscoveryToken string LogLevel string - MaxConcurrentStreams uint32 // default is math.MaxUint32 + MaxConcurrentStreams uint32 // default is math.MaxUint32 + CorruptCheckTime time.Duration + CompactHashCheckEnabled bool + CompactHashCheckTime time.Duration } // NewEtcdProcessCluster launches a new cluster from etcd processes, returning @@ -347,6 +350,16 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfigs(tb testing.TB) []* args = append(args, "--max-concurrent-streams", fmt.Sprintf("%d", cfg.MaxConcurrentStreams)) } + if cfg.CorruptCheckTime != 0 { + args = append(args, "--experimental-corrupt-check-time", fmt.Sprintf("%s", cfg.CorruptCheckTime)) + } + if cfg.CompactHashCheckEnabled { + args = append(args, "--experimental-compact-hash-check-enabled") + } + if cfg.CompactHashCheckTime != 0 { + args = append(args, "--experimental-compact-hash-check-time", cfg.CompactHashCheckTime.String()) + } + etcdCfgs[i] = &EtcdServerProcessConfig{ lg: lg, ExecPath: cfg.ExecPath, diff --git a/tests/go.mod b/tests/go.mod index ba279ef14..0ce05f471 100644 --- a/tests/go.mod +++ b/tests/go.mod @@ -28,7 +28,6 @@ require ( github.com/spf13/cobra v1.4.0 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.7.2 - go.etcd.io/bbolt v1.3.6 go.etcd.io/etcd/api/v3 v3.6.0-alpha.0 go.etcd.io/etcd/client/pkg/v3 v3.6.0-alpha.0 go.etcd.io/etcd/client/v2 v2.306.0-alpha.0 @@ -80,6 +79,7 @@ require ( github.com/sirupsen/logrus v1.8.1 // indirect github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect + go.etcd.io/bbolt v1.3.6 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.32.0 // indirect go.opentelemetry.io/otel v1.7.0 // indirect go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.7.0 // indirect diff --git a/tests/integration/corrupt_test.go b/tests/integration/corrupt_test.go new file mode 100644 index 000000000..b96d8465e --- /dev/null +++ b/tests/integration/corrupt_test.go @@ -0,0 +1,175 @@ +// Copyright 2022 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package integration + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.etcd.io/etcd/api/v3/etcdserverpb" + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/server/v3/storage/mvcc/testutil" + "go.etcd.io/etcd/tests/v3/framework/integration" +) + +func TestPeriodicCheck(t *testing.T) { + integration.BeforeTest(t) + + clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + cc, err := clus.ClusterClient() + require.NoError(t, err) + + ctx := context.Background() + + var totalRevisions int64 = 1210 + var rev int64 + for ; rev < totalRevisions; rev += testutil.CompactionCycle { + testPeriodicCheck(ctx, t, cc, clus, rev, rev+testutil.CompactionCycle) + } + testPeriodicCheck(ctx, t, cc, clus, rev, rev+totalRevisions) + alarmResponse, err := cc.AlarmList(ctx) + assert.NoError(t, err, "error on alarm list") + assert.Equal(t, []*etcdserverpb.AlarmMember(nil), alarmResponse.Alarms) +} + +func testPeriodicCheck(ctx context.Context, t *testing.T, cc *clientv3.Client, clus *integration.Cluster, start, stop int64) { + for i := start; i <= stop; i++ { + if i%67 == 0 { + _, err := cc.Delete(ctx, testutil.PickKey(i+83)) + assert.NoError(t, err, "error on delete") + } else { + _, err := cc.Put(ctx, testutil.PickKey(i), fmt.Sprint(i)) + assert.NoError(t, err, "error on put") + } + } + err := clus.Members[0].Server.CorruptionChecker().PeriodicCheck() + assert.NoError(t, err, "error on periodic check (rev %v)", stop) +} + +func TestPeriodicCheckDetectsCorruption(t *testing.T) { + integration.BeforeTest(t) + + clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + cc, err := clus.ClusterClient() + require.NoError(t, err) + + ctx := context.Background() + + for i := 0; i < 10; i++ { + _, err := cc.Put(ctx, testutil.PickKey(int64(i)), fmt.Sprint(i)) + assert.NoError(t, err, "error on put") + } + + err = clus.Members[0].Server.CorruptionChecker().PeriodicCheck() + assert.NoError(t, err, "error on periodic check") + clus.Members[0].Stop(t) + clus.WaitLeader(t) + + err = testutil.CorruptBBolt(clus.Members[0].BackendPath()) + assert.NoError(t, err) + + err = clus.Members[0].Restart(t) + assert.NoError(t, err) + time.Sleep(50 * time.Millisecond) + leader := clus.WaitLeader(t) + err = clus.Members[leader].Server.CorruptionChecker().PeriodicCheck() + assert.NoError(t, err, "error on periodic check") + time.Sleep(50 * time.Millisecond) + alarmResponse, err := cc.AlarmList(ctx) + assert.NoError(t, err, "error on alarm list") + // TODO: Investigate why MemberID is 0? + assert.Equal(t, []*etcdserverpb.AlarmMember{{Alarm: etcdserverpb.AlarmType_CORRUPT, MemberID: 0}}, alarmResponse.Alarms) +} + +func TestCompactHashCheck(t *testing.T) { + integration.BeforeTest(t) + + clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + cc, err := clus.ClusterClient() + require.NoError(t, err) + + ctx := context.Background() + + var totalRevisions int64 = 1210 + var rev int64 + for ; rev < totalRevisions; rev += testutil.CompactionCycle { + testCompactionHash(ctx, t, cc, clus, rev, rev+testutil.CompactionCycle) + } + testCompactionHash(ctx, t, cc, clus, rev, rev+totalRevisions) +} + +func testCompactionHash(ctx context.Context, t *testing.T, cc *clientv3.Client, clus *integration.Cluster, start, stop int64) { + for i := start; i <= stop; i++ { + if i%67 == 0 { + _, err := cc.Delete(ctx, testutil.PickKey(i+83)) + assert.NoError(t, err, "error on delete") + } else { + _, err := cc.Put(ctx, testutil.PickKey(i), fmt.Sprint(i)) + assert.NoError(t, err, "error on put") + } + } + _, err := cc.Compact(ctx, stop) + assert.NoError(t, err, "error on compact (rev %v)", stop) + // Wait for compaction to be compacted + time.Sleep(50 * time.Millisecond) + + clus.Members[0].Server.CorruptionChecker().CompactHashCheck() +} + +func TestCompactHashCheckDetectCorruption(t *testing.T) { + integration.BeforeTest(t) + + clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + cc, err := clus.ClusterClient() + require.NoError(t, err) + + ctx := context.Background() + + for i := 0; i < 10; i++ { + _, err := cc.Put(ctx, testutil.PickKey(int64(i)), fmt.Sprint(i)) + assert.NoError(t, err, "error on put") + } + + clus.Members[0].Server.CorruptionChecker().CompactHashCheck() + clus.Members[0].Stop(t) + clus.WaitLeader(t) + + err = testutil.CorruptBBolt(clus.Members[0].BackendPath()) + assert.NoError(t, err) + + err = clus.Members[0].Restart(t) + assert.NoError(t, err) + _, err = cc.Compact(ctx, 5) + assert.NoError(t, err) + time.Sleep(50 * time.Millisecond) + leader := clus.WaitLeader(t) + clus.Members[leader].Server.CorruptionChecker().CompactHashCheck() + time.Sleep(50 * time.Millisecond) + alarmResponse, err := cc.AlarmList(ctx) + assert.NoError(t, err, "error on alarm list") + assert.Equal(t, []*etcdserverpb.AlarmMember{{Alarm: etcdserverpb.AlarmType_CORRUPT, MemberID: uint64(clus.Members[0].ID())}}, alarmResponse.Alarms) +}