From 3565a822de0220d60d2cc422b6f0cb880d509d8c Mon Sep 17 00:00:00 2001 From: Siyuan Zhang Date: Wed, 31 Jan 2024 17:20:04 -0800 Subject: [PATCH] Add VerifyTxConsistency to backend. Signed-off-by: Siyuan Zhang Update server/storage/backend/verify.go Co-authored-by: Benjamin Wang Update server/storage/backend/verify.go Co-authored-by: Benjamin Wang --- etcdutl/go.mod | 1 + server/etcdserver/server.go | 2 ++ server/storage/backend/verify.go | 40 ++++++++++++++++++++++++++++++++ server/storage/schema/bucket.go | 2 ++ 4 files changed, 45 insertions(+) diff --git a/etcdutl/go.mod b/etcdutl/go.mod index 70ff9ba72..3cc3d64a2 100644 --- a/etcdutl/go.mod +++ b/etcdutl/go.mod @@ -44,6 +44,7 @@ require ( github.com/golang-jwt/jwt/v4 v4.5.0 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/google/btree v1.1.2 // indirect + github.com/google/go-cmp v0.6.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/jonboulle/clockwork v0.4.0 // indirect diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index a1b51a171..f5239aee2 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -946,6 +946,7 @@ func (s *EtcdServer) Cleanup() { func (s *EtcdServer) applyAll(ep *etcdProgress, apply *toApply) { s.applySnapshot(ep, apply) s.applyEntries(ep, apply) + backend.VerifyBackendConsistency(s.Backend(), s.Logger(), true, schema.AllBuckets...) proposalsApplied.Set(float64(ep.appliedi)) s.applyWait.Trigger(ep.appliedi) @@ -2272,6 +2273,7 @@ func (s *EtcdServer) monitorKVHash() { return case <-checkTicker.C: } + backend.VerifyBackendConsistency(s.be, lg, false, schema.AllBuckets...) if !s.isLeader() { continue } diff --git a/server/storage/backend/verify.go b/server/storage/backend/verify.go index c55279f81..fc43523a9 100644 --- a/server/storage/backend/verify.go +++ b/server/storage/backend/verify.go @@ -15,9 +15,11 @@ package backend import ( + "fmt" "runtime/debug" "strings" + "github.com/google/go-cmp/cmp" "go.uber.org/zap" "go.etcd.io/etcd/client/pkg/v3/verify" @@ -67,3 +69,41 @@ func insideUnittest() bool { stackTraceStr := string(debug.Stack()) return strings.Contains(stackTraceStr, "_test.go") && !strings.Contains(stackTraceStr, "tests/") } + +// VerifyBackendConsistency verifies data in ReadTx and BatchTx are consistent. +func VerifyBackendConsistency(b Backend, lg *zap.Logger, skipSafeRangeBucket bool, bucket ...Bucket) { + verify.Verify(func() { + if b == nil { + return + } + if lg != nil { + lg.Debug("verifyBackendConsistency", zap.Bool("skipSafeRangeBucket", skipSafeRangeBucket)) + } + b.BatchTx().LockOutsideApply() + defer b.BatchTx().Unlock() + b.ReadTx().RLock() + defer b.ReadTx().RUnlock() + for _, bkt := range bucket { + if skipSafeRangeBucket && bkt.IsSafeRangeBucket() { + continue + } + unsafeVerifyTxConsistency(b, bkt) + } + }) +} + +func unsafeVerifyTxConsistency(b Backend, bucket Bucket) { + dataFromWriteTxn := map[string]string{} + b.BatchTx().UnsafeForEach(bucket, func(k, v []byte) error { + dataFromWriteTxn[string(k)] = string(v) + return nil + }) + dataFromReadTxn := map[string]string{} + b.ReadTx().UnsafeForEach(bucket, func(k, v []byte) error { + dataFromReadTxn[string(k)] = string(v) + return nil + }) + if diff := cmp.Diff(dataFromWriteTxn, dataFromReadTxn); diff != "" { + panic(fmt.Sprintf("bucket %s data mismatch\nwrite TXN: %v\nread TXN: %v\ndiff: %s", bucket.String(), dataFromWriteTxn, dataFromReadTxn, diff)) + } +} diff --git a/server/storage/schema/bucket.go b/server/storage/schema/bucket.go index 5472af3c3..06da660df 100644 --- a/server/storage/schema/bucket.go +++ b/server/storage/schema/bucket.go @@ -54,6 +54,8 @@ var ( AuthRoles = backend.Bucket(bucket{id: 22, name: authRolesBucketName, safeRangeBucket: false}) Test = backend.Bucket(bucket{id: 100, name: testBucketName, safeRangeBucket: false}) + + AllBuckets = []backend.Bucket{Key, Meta, Lease, Alarm, Cluster, Members, MembersRemoved, Auth, AuthUsers, AuthRoles} ) type bucket struct {