mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #17359 from siyuanfoundation/verify-test
Add VerifyTxConsistency to backend.
This commit is contained in:
commit
8c7f911b3b
@ -44,6 +44,7 @@ require (
|
||||
github.com/golang-jwt/jwt/v4 v4.5.0 // indirect
|
||||
github.com/golang/protobuf v1.5.3 // indirect
|
||||
github.com/google/btree v1.1.2 // indirect
|
||||
github.com/google/go-cmp v0.6.0 // indirect
|
||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1 // indirect
|
||||
github.com/inconshreveable/mousetrap v1.1.0 // indirect
|
||||
github.com/jonboulle/clockwork v0.4.0 // indirect
|
||||
|
@ -946,6 +946,7 @@ func (s *EtcdServer) Cleanup() {
|
||||
func (s *EtcdServer) applyAll(ep *etcdProgress, apply *toApply) {
|
||||
s.applySnapshot(ep, apply)
|
||||
s.applyEntries(ep, apply)
|
||||
backend.VerifyBackendConsistency(s.Backend(), s.Logger(), true, schema.AllBuckets...)
|
||||
|
||||
proposalsApplied.Set(float64(ep.appliedi))
|
||||
s.applyWait.Trigger(ep.appliedi)
|
||||
@ -2272,6 +2273,7 @@ func (s *EtcdServer) monitorKVHash() {
|
||||
return
|
||||
case <-checkTicker.C:
|
||||
}
|
||||
backend.VerifyBackendConsistency(s.be, lg, false, schema.AllBuckets...)
|
||||
if !s.isLeader() {
|
||||
continue
|
||||
}
|
||||
|
@ -15,9 +15,11 @@
|
||||
package backend
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"runtime/debug"
|
||||
"strings"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"go.etcd.io/etcd/client/pkg/v3/verify"
|
||||
@ -67,3 +69,41 @@ func insideUnittest() bool {
|
||||
stackTraceStr := string(debug.Stack())
|
||||
return strings.Contains(stackTraceStr, "_test.go") && !strings.Contains(stackTraceStr, "tests/")
|
||||
}
|
||||
|
||||
// VerifyBackendConsistency verifies data in ReadTx and BatchTx are consistent.
|
||||
func VerifyBackendConsistency(b Backend, lg *zap.Logger, skipSafeRangeBucket bool, bucket ...Bucket) {
|
||||
verify.Verify(func() {
|
||||
if b == nil {
|
||||
return
|
||||
}
|
||||
if lg != nil {
|
||||
lg.Debug("verifyBackendConsistency", zap.Bool("skipSafeRangeBucket", skipSafeRangeBucket))
|
||||
}
|
||||
b.BatchTx().LockOutsideApply()
|
||||
defer b.BatchTx().Unlock()
|
||||
b.ReadTx().RLock()
|
||||
defer b.ReadTx().RUnlock()
|
||||
for _, bkt := range bucket {
|
||||
if skipSafeRangeBucket && bkt.IsSafeRangeBucket() {
|
||||
continue
|
||||
}
|
||||
unsafeVerifyTxConsistency(b, bkt)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func unsafeVerifyTxConsistency(b Backend, bucket Bucket) {
|
||||
dataFromWriteTxn := map[string]string{}
|
||||
b.BatchTx().UnsafeForEach(bucket, func(k, v []byte) error {
|
||||
dataFromWriteTxn[string(k)] = string(v)
|
||||
return nil
|
||||
})
|
||||
dataFromReadTxn := map[string]string{}
|
||||
b.ReadTx().UnsafeForEach(bucket, func(k, v []byte) error {
|
||||
dataFromReadTxn[string(k)] = string(v)
|
||||
return nil
|
||||
})
|
||||
if diff := cmp.Diff(dataFromWriteTxn, dataFromReadTxn); diff != "" {
|
||||
panic(fmt.Sprintf("bucket %s data mismatch\nwrite TXN: %v\nread TXN: %v\ndiff: %s", bucket.String(), dataFromWriteTxn, dataFromReadTxn, diff))
|
||||
}
|
||||
}
|
||||
|
@ -54,6 +54,8 @@ var (
|
||||
AuthRoles = backend.Bucket(bucket{id: 22, name: authRolesBucketName, safeRangeBucket: false})
|
||||
|
||||
Test = backend.Bucket(bucket{id: 100, name: testBucketName, safeRangeBucket: false})
|
||||
|
||||
AllBuckets = []backend.Bucket{Key, Meta, Lease, Alarm, Cluster, Members, MembersRemoved, Auth, AuthUsers, AuthRoles}
|
||||
)
|
||||
|
||||
type bucket struct {
|
||||
|
Loading…
x
Reference in New Issue
Block a user