From 21fb173f76e9962ee19cb16308941194aae5936d Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Fri, 20 May 2022 12:11:18 +0200 Subject: [PATCH] server: Implement compaction hash checking Signed-off-by: Marek Siarkowicz --- server/etcdserver/corrupt.go | 89 +++++++++++++++++++++++++- server/etcdserver/corrupt_test.go | 100 +++++++++++++++++++++++++++++- server/etcdserver/server.go | 20 +++++- server/mvcc/hash.go | 14 +++++ tests/e2e/corrupt_test.go | 45 ++++++++++++++ tests/integration/corrupt_test.go | 73 ++++++++++++++++++++++ 6 files changed, 335 insertions(+), 6 deletions(-) diff --git a/server/etcdserver/corrupt.go b/server/etcdserver/corrupt.go index dc7e77548..c148c8730 100644 --- a/server/etcdserver/corrupt.go +++ b/server/etcdserver/corrupt.go @@ -21,7 +21,9 @@ import ( "fmt" "io/ioutil" "net/http" + "sort" "strings" + "sync" "time" pb "go.etcd.io/etcd/api/v3/etcdserverpb" @@ -36,12 +38,16 @@ import ( type CorruptionChecker interface { InitialCheck() error PeriodicCheck() error + CompactHashCheck() } type corruptionChecker struct { lg *zap.Logger hasher Hasher + + mux sync.RWMutex + latestRevisionChecked int64 } type Hasher interface { @@ -53,10 +59,10 @@ type Hasher interface { TriggerCorruptAlarm(uint64) } -func NewCorruptionChecker(lg *zap.Logger, s *EtcdServer) *corruptionChecker { +func newCorruptionChecker(lg *zap.Logger, s *EtcdServer, storage mvcc.HashStorage) *corruptionChecker { return &corruptionChecker{ lg: lg, - hasher: hasherAdapter{s, s.KV().HashStorage()}, + hasher: hasherAdapter{s, storage}, } } @@ -251,6 +257,85 @@ func (cm *corruptionChecker) PeriodicCheck() error { return nil } +func (cm *corruptionChecker) CompactHashCheck() { + cm.lg.Info("starting compact hash check", + zap.String("local-member-id", cm.hasher.MemberId().String()), + zap.Duration("timeout", cm.hasher.ReqTimeout()), + ) + hashes := cm.uncheckedRevisions() + // Assume that revisions are ordered from largest to smallest + for i, hash := range hashes { + peers := cm.hasher.PeerHashByRev(hash.Revision) + if len(peers) == 0 { + continue + } + peersChecked := 0 + for _, p := range peers { + if p.resp == nil || p.resp.CompactRevision != hash.CompactRevision { + continue + } + + // follower's compact revision is leader's old one, then hashes must match + if p.resp.Hash != hash.Hash { + cm.hasher.TriggerCorruptAlarm(uint64(p.id)) + cm.lg.Error("failed compaction hash check", + zap.Int64("revision", hash.Revision), + zap.Int64("leader-compact-revision", hash.CompactRevision), + zap.Uint32("leader-hash", hash.Hash), + zap.Int64("follower-compact-revision", p.resp.CompactRevision), + zap.Uint32("follower-hash", p.resp.Hash), + zap.String("follower-peer-id", p.id.String()), + ) + return + } + peersChecked++ + cm.lg.Info("successfully checked hash on follower", + zap.Int64("revision", hash.Revision), + zap.String("peer-id", p.id.String()), + ) + } + if len(peers) == peersChecked { + cm.lg.Info("successfully checked hash on whole cluster", + zap.Int("number-of-peers-checked", peersChecked), + zap.Int64("revision", hash.Revision), + ) + cm.mux.Lock() + if hash.Revision > cm.latestRevisionChecked { + cm.latestRevisionChecked = hash.Revision + } + cm.mux.Unlock() + cm.lg.Info("finished compaction hash check", zap.Int("number-of-hashes-checked", i+1)) + return + } else { + cm.lg.Warn("skipped checking hash; was not able to check all peers", + zap.Int("number-of-peers-checked", peersChecked), + zap.Int("number-of-peers", len(peers)), + zap.Int64("revision", hash.Revision), + ) + } + } + cm.lg.Info("finished compaction hash check", zap.Int("number-of-hashes-checked", len(hashes))) + return +} + +func (cm *corruptionChecker) uncheckedRevisions() []mvcc.KeyValueHash { + cm.mux.RLock() + lastRevisionChecked := cm.latestRevisionChecked + cm.mux.RUnlock() + + hashes := cm.hasher.Hashes() + // Sort in descending order + sort.Slice(hashes, func(i, j int) bool { + return hashes[i].Revision > hashes[j].Revision + }) + for i, hash := range hashes { + if hash.Revision <= lastRevisionChecked { + return hashes[:i] + } + } + return hashes +} + func (s *EtcdServer) triggerCorruptAlarm(id uint64) { a := &pb.AlarmRequest{ MemberID: id, diff --git a/server/etcdserver/corrupt_test.go b/server/etcdserver/corrupt_test.go index 77a1a859d..9e4030714 100644 --- a/server/etcdserver/corrupt_test.go +++ b/server/etcdserver/corrupt_test.go @@ -221,11 +221,101 @@ func TestPeriodicCheck(t *testing.T) { } } +func TestCompactHashCheck(t *testing.T) { + tcs := []struct { + name string + hasher fakeHasher + lastRevisionChecked int64 + + expectError bool + expectCorrupt bool + expectActions []string + expectLastRevisionChecked int64 + }{ + { + name: "No hashes", + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()"}, + }, + { + name: "No peers, check new checked from largest to smallest", + hasher: fakeHasher{ + hashes: []mvcc.KeyValueHash{{Revision: 1}, {Revision: 2}, {Revision: 3}, {Revision: 4}}, + }, + lastRevisionChecked: 2, + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(4)", "PeerHashByRev(3)"}, + expectLastRevisionChecked: 2, + }, + { + name: "Peer error", + hasher: fakeHasher{ + hashes: []mvcc.KeyValueHash{{Revision: 1}, {Revision: 2}}, + peerHashes: []*peerHashKVResp{{err: fmt.Errorf("failed getting hash")}}, + }, + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "PeerHashByRev(1)"}, + }, + { + name: "Peer returned different compaction revision is skipped", + hasher: fakeHasher{ + hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1}, {Revision: 2, CompactRevision: 2}}, + peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{CompactRevision: 3}}}, + }, + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "PeerHashByRev(1)"}, + }, + { + name: "Peer returned same compaction revision but different hash triggers alarm", + hasher: fakeHasher{ + hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}, {Revision: 2, CompactRevision: 1, Hash: 2}}, + peerHashes: []*peerHashKVResp{{peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}}}, + }, + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "TriggerCorruptAlarm(42)"}, + expectCorrupt: true, + }, + { + name: "Peer returned same hash bumps last revision checked", + hasher: fakeHasher{ + hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}, {Revision: 2, CompactRevision: 1, Hash: 1}}, + peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{MemberId: 42}, CompactRevision: 1, Hash: 1}}}, + }, + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)"}, + expectLastRevisionChecked: 2, + }, + { + name: "Only one peer succeeded check", + hasher: fakeHasher{ + hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}}, + peerHashes: []*peerHashKVResp{ + {resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{MemberId: 42}, CompactRevision: 1, Hash: 1}}, + {err: fmt.Errorf("failed getting hash")}, + }, + }, + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(1)"}, + }, + } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + monitor := corruptionChecker{ + latestRevisionChecked: tc.lastRevisionChecked, + lg: zaptest.NewLogger(t), + hasher: &tc.hasher, + } + monitor.CompactHashCheck() + if tc.hasher.alarmTriggered != tc.expectCorrupt { + t.Errorf("Unexpected corrupt triggered, got: %v, expected?: %v", tc.hasher.alarmTriggered, tc.expectCorrupt) + } + if tc.expectLastRevisionChecked != monitor.latestRevisionChecked { + t.Errorf("Unexpected last revision checked, got: %v, expected?: %v", monitor.latestRevisionChecked, tc.expectLastRevisionChecked) + } + assert.Equal(t, tc.expectActions, tc.hasher.actions) + }) + } +} + type fakeHasher struct { peerHashes []*peerHashKVResp hashByRevIndex int hashByRevResponses []hashByRev linearizableReadNotify error + hashes []mvcc.KeyValueHash alarmTriggered bool actions []string @@ -251,8 +341,14 @@ func (f *fakeHasher) HashByRev(rev int64) (hash mvcc.KeyValueHash, revision int6 return hashByRev.hash, hashByRev.revision, hashByRev.err } -func (f *fakeHasher) Store(valueHash mvcc.KeyValueHash) { - panic("not implemented") +func (f *fakeHasher) Store(hash mvcc.KeyValueHash) { + f.actions = append(f.actions, fmt.Sprintf("Store(%v)", hash)) + f.hashes = append(f.hashes, hash) +} + +func (f *fakeHasher) Hashes() []mvcc.KeyValueHash { + f.actions = append(f.actions, "Hashes()") + return f.hashes } func (f *fakeHasher) ReqTimeout() time.Duration { diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 78c65df74..184947a5d 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -108,7 +108,8 @@ var ( // monitorVersionInterval should be smaller than the timeout // on the connection. Or we will not be able to reuse the connection // (since it will timeout). - monitorVersionInterval = rafthttp.ConnWriteTimeout - time.Second + monitorVersionInterval = rafthttp.ConnWriteTimeout - time.Second + CompactHashCheckInterval = 15 * time.Second recommendedMaxRequestBytesString = humanize.Bytes(uint64(recommendedMaxRequestBytes)) storeMemberAttributeRegexp = regexp.MustCompile(path.Join(membership.StoreMembersPrefix, "[[:xdigit:]]{1,16}", "attributes")) @@ -630,7 +631,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { ) } } - srv.corruptionChecker = NewCorruptionChecker(cfg.Logger, srv) + srv.corruptionChecker = newCorruptionChecker(cfg.Logger, srv, srv.kv.HashStorage()) srv.authStore = auth.NewAuthStore(srv.Logger(), srv.be, tp, int(cfg.BcryptCost)) @@ -805,6 +806,7 @@ func (s *EtcdServer) Start() { s.GoAttach(s.monitorVersions) s.GoAttach(s.linearizableReadLoop) s.GoAttach(s.monitorKVHash) + s.GoAttach(s.monitorCompactHash) s.GoAttach(s.monitorDowngrade) } @@ -2537,6 +2539,20 @@ func (s *EtcdServer) monitorKVHash() { } } +func (s *EtcdServer) monitorCompactHash() { + for { + select { + case <-time.After(CompactHashCheckInterval): + case <-s.stopping: + return + } + if !s.isLeader() { + continue + } + s.corruptionChecker.CompactHashCheck() + } +} + func (s *EtcdServer) updateClusterVersionV2(ver string) { lg := s.Logger() diff --git a/server/mvcc/hash.go b/server/mvcc/hash.go index fbc60abb4..11232bf56 100644 --- a/server/mvcc/hash.go +++ b/server/mvcc/hash.go @@ -93,6 +93,9 @@ type HashStorage interface { // Store adds hash value in local cache, allowing it can be returned by HashByRev. Store(valueHash KeyValueHash) + + // Hashes returns list of up to `hashStorageMaxSize` newest previously stored hashes. + Hashes() []KeyValueHash } type hashStorage struct { @@ -146,3 +149,14 @@ func (s *hashStorage) Store(hash KeyValueHash) { s.hashes = s.hashes[len(s.hashes)-hashStorageMaxSize:] } } + +func (s *hashStorage) Hashes() []KeyValueHash { + s.hashMu.RLock() + // Copy out hashes under lock just to be safe + hashes := make([]KeyValueHash, 0, len(s.hashes)) + for _, hash := range s.hashes { + hashes = append(hashes, hash) + } + s.hashMu.RUnlock() + return hashes +} diff --git a/tests/e2e/corrupt_test.go b/tests/e2e/corrupt_test.go index 8920f1c47..ef788d1c9 100644 --- a/tests/e2e/corrupt_test.go +++ b/tests/e2e/corrupt_test.go @@ -24,6 +24,7 @@ import ( "go.etcd.io/etcd/api/v3/etcdserverpb" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/server/v3/datadir" + "go.etcd.io/etcd/server/v3/etcdserver" "go.etcd.io/etcd/server/v3/storage/mvcc/testutil" ) @@ -131,3 +132,47 @@ func TestPeriodicCheckDetectsCorruption(t *testing.T) { // TODO: Investigate why MemberID is 0? assert.Equal(t, []*etcdserverpb.AlarmMember{{Alarm: etcdserverpb.AlarmType_CORRUPT, MemberID: 0}}, alarmResponse.Alarms) } + +func TestCompactHashCheckDetectCorruption(t *testing.T) { + BeforeTest(t) + epc, err := newEtcdProcessCluster(t, &etcdProcessClusterConfig{ + clusterSize: 3, + keepDataDir: true, + }) + if err != nil { + t.Fatalf("could not start etcd process cluster (%v)", err) + } + t.Cleanup(func() { + if errC := epc.Close(); errC != nil { + t.Fatalf("error closing etcd processes (%v)", errC) + } + }) + + cc := NewEtcdctl(epc.EndpointsV3()) + + for i := 0; i < 10; i++ { + err := cc.Put(testutil.PickKey(int64(i)), fmt.Sprint(i)) + assert.NoError(t, err, "error on put") + } + members, err := cc.MemberList() + assert.NoError(t, err, "error on member list") + var memberID uint64 + for _, m := range members.Members { + if m.Name == epc.procs[0].Config().name { + memberID = m.ID + } + } + + epc.procs[0].Stop() + err = testutil.CorruptBBolt(datadir.ToBackendFileName(epc.procs[0].Config().dataDirPath)) + assert.NoError(t, err) + + err = epc.procs[0].Restart() + assert.NoError(t, err) + _, err = cc.Compact(5) + assert.NoError(t, err) + time.Sleep(etcdserver.CompactHashCheckInterval * 11 / 10) + alarmResponse, err := cc.AlarmList() + assert.NoError(t, err, "error on alarm list") + assert.Equal(t, []*etcdserverpb.AlarmMember{{Alarm: etcdserverpb.AlarmType_CORRUPT, MemberID: memberID}}, alarmResponse.Alarms) +} diff --git a/tests/integration/corrupt_test.go b/tests/integration/corrupt_test.go index f32fe682b..80202f974 100644 --- a/tests/integration/corrupt_test.go +++ b/tests/integration/corrupt_test.go @@ -99,3 +99,76 @@ func TestPeriodicCheckDetectsCorruption(t *testing.T) { // TODO: Investigate why MemberID is 0? assert.Equal(t, []*etcdserverpb.AlarmMember{{Alarm: etcdserverpb.AlarmType_CORRUPT, MemberID: 0}}, alarmResponse.Alarms) } + +func TestCompactHashCheck(t *testing.T) { + BeforeTest(t) + + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + cc, err := clus.ClusterClient() + require.NoError(t, err) + + ctx := context.Background() + + var totalRevisions int64 = 1210 + var rev int64 + for ; rev < totalRevisions; rev += testutil.CompactionCycle { + testCompactionHash(ctx, t, cc, clus, rev, rev+testutil.CompactionCycle) + } + testCompactionHash(ctx, t, cc, clus, rev, rev+totalRevisions) +} + +func testCompactionHash(ctx context.Context, t *testing.T, cc *clientv3.Client, clus *ClusterV3, start, stop int64) { + for i := start; i <= stop; i++ { + if i%67 == 0 { + _, err := cc.Delete(ctx, testutil.PickKey(i+83)) + assert.NoError(t, err, "error on delete") + } else { + _, err := cc.Put(ctx, testutil.PickKey(i), fmt.Sprint(i)) + assert.NoError(t, err, "error on put") + } + } + _, err := cc.Compact(ctx, stop) + assert.NoError(t, err, "error on compact (rev %v)", stop) + // Wait for compaction to be compacted + time.Sleep(50 * time.Millisecond) + + clus.Members[0].CorruptionChecker().CompactHashCheck() +} + +func TestCompactHashCheckDetectCorruption(t *testing.T) { + BeforeTest(t) + + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + cc, err := clus.ClusterClient() + require.NoError(t, err) + + ctx := context.Background() + + for i := 0; i < 10; i++ { + _, err := cc.Put(ctx, testutil.PickKey(int64(i)), fmt.Sprint(i)) + assert.NoError(t, err, "error on put") + } + + clus.Members[0].CorruptionChecker().CompactHashCheck() + clus.Members[0].Stop(t) + clus.WaitLeader(t) + + err = testutil.CorruptBBolt(clus.Members[0].BackendPath()) + assert.NoError(t, err) + + err = clus.Members[0].Restart(t) + assert.NoError(t, err) + _, err = cc.Compact(ctx, 5) + assert.NoError(t, err) + time.Sleep(50 * time.Millisecond) + leader := clus.WaitLeader(t) + clus.Members[leader].CorruptionChecker().CompactHashCheck() + time.Sleep(50 * time.Millisecond) + alarmResponse, err := cc.AlarmList(ctx) + assert.NoError(t, err, "error on alarm list") + assert.Equal(t, []*etcdserverpb.AlarmMember{{Alarm: etcdserverpb.AlarmType_CORRUPT, MemberID: uint64(clus.Members[0].ID())}}, alarmResponse.Alarms) +}