From c3af9427ed4ff53aaf1807c976b79e56783dda9d Mon Sep 17 00:00:00 2001 From: Siyuan Zhang Date: Fri, 12 Jan 2024 09:14:01 -0800 Subject: [PATCH 1/2] add tests to test tx delete consistency. Signed-off-by: Siyuan Zhang --- clientv3/integration/user_test.go | 51 ++++++++++++++++++++++++ go.mod | 1 + go.sum | 2 + mvcc/backend/batch_tx_test.go | 66 +++++++++++++++++++++++++++++++ 4 files changed, 120 insertions(+) diff --git a/clientv3/integration/user_test.go b/clientv3/integration/user_test.go index e49c56651..26120cdf2 100644 --- a/clientv3/integration/user_test.go +++ b/clientv3/integration/user_test.go @@ -19,6 +19,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" "go.etcd.io/etcd/integration" @@ -55,6 +56,56 @@ func TestUserError(t *testing.T) { } } +func TestAddUserAfterDelete(t *testing.T) { + defer testutil.AfterTest(t) + + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + authapi := clus.RandClient() + authSetupRoot(t, authapi.Auth) + cfg := clientv3.Config{ + Endpoints: authapi.Endpoints(), + DialTimeout: 5 * time.Second, + DialOptions: []grpc.DialOption{grpc.WithBlock()}, + } + cfg.Username, cfg.Password = "root", "123" + authed, err := clientv3.New(cfg) + require.NoError(t, err) + defer authed.Close() + + // add user + _, err = authed.UserAdd(context.TODO(), "foo", "bar") + require.NoError(t, err) + _, err = authapi.Authenticate(context.TODO(), "foo", "bar") + require.NoError(t, err) + // delete user + _, err = authed.UserDelete(context.TODO(), "foo") + require.NoError(t, err) + if _, err = authed.Authenticate(context.TODO(), "foo", "bar"); err == nil { + t.Errorf("expect Authenticate error for old password") + } + // add user back + _, err = authed.UserAdd(context.TODO(), "foo", "bar") + require.NoError(t, err) + _, err = authed.Authenticate(context.TODO(), "foo", "bar") + require.NoError(t, err) + // change password + _, err = authed.UserChangePassword(context.TODO(), "foo", "bar2") + require.NoError(t, err) + _, err = authed.UserChangePassword(context.TODO(), "foo", "bar1") + require.NoError(t, err) + + if _, err = authed.Authenticate(context.TODO(), "foo", "bar"); err == nil { + t.Errorf("expect Authenticate error for old password") + } + if _, err = authed.Authenticate(context.TODO(), "foo", "bar2"); err == nil { + t.Errorf("expect Authenticate error for old password") + } + _, err = authed.Authenticate(context.TODO(), "foo", "bar1") + require.NoError(t, err) +} + func TestUserErrorAuth(t *testing.T) { defer testutil.AfterTest(t) diff --git a/go.mod b/go.mod index 68b388230..8cf4a57db 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903 github.com/golang/protobuf v1.5.3 github.com/google/btree v1.0.0 + github.com/google/go-cmp v0.5.9 github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/grpc-ecosystem/grpc-gateway v1.11.0 diff --git a/go.sum b/go.sum index 32658096a..b421e7710 100644 --- a/go.sum +++ b/go.sum @@ -84,6 +84,8 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= diff --git a/mvcc/backend/batch_tx_test.go b/mvcc/backend/batch_tx_test.go index a3bb6774e..5661b6659 100644 --- a/mvcc/backend/batch_tx_test.go +++ b/mvcc/backend/batch_tx_test.go @@ -19,6 +19,7 @@ import ( "testing" "time" + "github.com/google/go-cmp/cmp" bolt "go.etcd.io/bbolt" ) @@ -195,3 +196,68 @@ func TestBatchTxBatchLimitCommit(t *testing.T) { return nil }) } + +func TestRangeAfterDeleteMatch(t *testing.T) { + b, tmpPath := NewTmpBackend(time.Hour, 10000) + defer cleanup(b, tmpPath) + + tx := b.BatchTx() + + tx.Lock() + tx.UnsafeCreateBucket([]byte("test")) + tx.UnsafePut([]byte("test"), []byte("foo"), []byte("bar")) + tx.Unlock() + tx.Commit() + + checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), []byte("foo"), nil, 0) + checkForEach(t, b.BatchTx(), b.ReadTx(), [][]byte{[]byte("foo")}, [][]byte{[]byte("bar")}) + tx.Lock() + tx.UnsafeDelete([]byte("test"), []byte("foo")) + tx.Unlock() + + checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), []byte("foo"), nil, 0) + checkForEach(t, b.BatchTx(), b.ReadTx(), nil, nil) +} + +func checkRangeResponseMatch(t *testing.T, tx BatchTx, rtx ReadTx, key, endKey []byte, limit int64) { + tx.Lock() + ks1, vs1 := tx.UnsafeRange([]byte("test"), key, endKey, limit) + tx.Unlock() + + rtx.RLock() + ks2, vs2 := rtx.UnsafeRange([]byte("test"), key, endKey, limit) + rtx.RUnlock() + + if diff := cmp.Diff(ks1, ks2); diff != "" { + t.Errorf("keys on read and batch transaction doesn't match, diff: %s", diff) + } + if diff := cmp.Diff(vs1, vs2); diff != "" { + t.Errorf("values on read and batch transaction doesn't match, diff: %s", diff) + } +} + +func checkForEach(t *testing.T, tx BatchTx, rtx ReadTx, expectedKeys, expectedValues [][]byte) { + tx.Lock() + checkUnsafeForEach(t, tx, expectedKeys, expectedValues) + tx.Unlock() + + rtx.RLock() + checkUnsafeForEach(t, rtx, expectedKeys, expectedValues) + rtx.RUnlock() +} + +func checkUnsafeForEach(t *testing.T, tx ReadTx, expectedKeys, expectedValues [][]byte) { + var ks, vs [][]byte + tx.UnsafeForEach([]byte("test"), func(k, v []byte) error { + ks = append(ks, k) + vs = append(vs, v) + return nil + }) + + if diff := cmp.Diff(ks, expectedKeys); diff != "" { + t.Errorf("keys on transaction doesn't match expected, diff: %s", diff) + } + if diff := cmp.Diff(vs, expectedValues); diff != "" { + t.Errorf("values on transaction doesn't match expected, diff: %s", diff) + } +} From b70684b93dab2a83951428a7c107d943c8da7399 Mon Sep 17 00:00:00 2001 From: Siyuan Zhang Date: Fri, 12 Jan 2024 09:16:49 -0800 Subject: [PATCH 2/2] commit bbolt transaction if there is any pending deleting operations Signed-off-by: Siyuan Zhang --- mvcc/backend/batch_tx.go | 31 +++++++++++++++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/mvcc/backend/batch_tx.go b/mvcc/backend/batch_tx.go index adebe7d14..2931c04e8 100644 --- a/mvcc/backend/batch_tx.go +++ b/mvcc/backend/batch_tx.go @@ -258,7 +258,8 @@ func (t *batchTx) commit(stop bool) { type batchTxBuffered struct { batchTx - buf txWriteBuffer + buf txWriteBuffer + pendingDeleteOperations int } func newBatchTxBuffered(backend *backend) *batchTxBuffered { @@ -279,7 +280,27 @@ func (t *batchTxBuffered) Unlock() { // gofail: var beforeWritebackBuf struct{} t.buf.writeback(&t.backend.readTx.buf) t.backend.readTx.Unlock() - if t.pending >= t.backend.batchLimit { + // We commit the transaction when the number of pending operations + // reaches the configured limit(batchLimit) to prevent it from + // becoming excessively large. + // + // But we also need to commit the transaction immediately if there + // is any pending deleting operation, otherwise etcd might run into + // a situation that it haven't finished committing the data into backend + // storage (note: etcd periodically commits the bbolt transactions + // instead of on each request) when it applies next request. Accordingly, + // etcd may still read the stale data from bbolt when processing next + // request. So it breaks the linearizability. + // + // Note we don't need to commit the transaction for put requests if + // it doesn't exceed the batch limit, because there is a buffer on top + // of the bbolt. Each time when etcd reads data from backend storage, + // it will read data from both bbolt and the buffer. But there is no + // such a buffer for delete requests. + // + // Please also refer to + // https://github.com/etcd-io/etcd/pull/17119#issuecomment-1857547158 + if t.pending >= t.backend.batchLimit || t.pendingDeleteOperations > 0 { t.commit(false) } } @@ -323,6 +344,7 @@ func (t *batchTxBuffered) unsafeCommit(stop bool) { } t.batchTx.commit(stop) + t.pendingDeleteOperations = 0 if !stop { t.backend.readTx.tx = t.backend.begin(false) @@ -338,3 +360,8 @@ func (t *batchTxBuffered) UnsafeSeqPut(bucketName []byte, key []byte, value []by t.batchTx.UnsafeSeqPut(bucketName, key, value) t.buf.putSeq(bucketName, key, value) } + +func (t *batchTxBuffered) UnsafeDelete(bucketName []byte, key []byte) { + t.batchTx.UnsafeDelete(bucketName, key) + t.pendingDeleteOperations++ +}