diff --git a/server/go.mod b/server/go.mod index 0af58699b..548a738c4 100644 --- a/server/go.mod +++ b/server/go.mod @@ -11,6 +11,7 @@ require ( github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da github.com/golang/protobuf v1.5.3 github.com/google/btree v1.1.2 + github.com/google/go-cmp v0.6.0 github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 diff --git a/server/storage/backend/batch_tx.go b/server/storage/backend/batch_tx.go index 60be4ce6d..c62d2af86 100644 --- a/server/storage/backend/batch_tx.go +++ b/server/storage/backend/batch_tx.go @@ -288,7 +288,8 @@ func (t *batchTx) commit(stop bool) { type batchTxBuffered struct { batchTx - buf txWriteBuffer + buf txWriteBuffer + pendingDeleteOperations int } func newBatchTxBuffered(backend *backend) *batchTxBuffered { @@ -310,7 +311,27 @@ func (t *batchTxBuffered) Unlock() { t.buf.writeback(&t.backend.readTx.buf) // gofail: var afterWritebackBuf struct{} t.backend.readTx.Unlock() - if t.pending >= t.backend.batchLimit { + // We commit the transaction when the number of pending operations + // reaches the configured limit(batchLimit) to prevent it from + // becoming excessively large. + // + // But we also need to commit the transaction immediately if there + // is any pending deleting operation, otherwise etcd might run into + // a situation that it haven't finished committing the data into backend + // storage (note: etcd periodically commits the bbolt transactions + // instead of on each request) when it applies next request. Accordingly, + // etcd may still read the stale data from bbolt when processing next + // request. So it breaks the linearizability. + // + // Note we don't need to commit the transaction for put requests if + // it doesn't exceed the batch limit, because there is a buffer on top + // of the bbolt. Each time when etcd reads data from backend storage, + // it will read data from both bbolt and the buffer. But there is no + // such a buffer for delete requests. + // + // Please also refer to + // https://github.com/etcd-io/etcd/pull/17119#issuecomment-1857547158 + if t.pending >= t.backend.batchLimit || t.pendingDeleteOperations > 0 { t.commit(false) } } @@ -356,6 +377,7 @@ func (t *batchTxBuffered) unsafeCommit(stop bool) { } t.batchTx.commit(stop) + t.pendingDeleteOperations = 0 if !stop { t.backend.readTx.tx = t.backend.begin(false) @@ -371,3 +393,13 @@ func (t *batchTxBuffered) UnsafeSeqPut(bucket Bucket, key []byte, value []byte) t.batchTx.UnsafeSeqPut(bucket, key, value) t.buf.putSeq(bucket, key, value) } + +func (t *batchTxBuffered) UnsafeDelete(bucketType Bucket, key []byte) { + t.batchTx.UnsafeDelete(bucketType, key) + t.pendingDeleteOperations++ +} + +func (t *batchTxBuffered) UnsafeDeleteBucket(bucket Bucket) { + t.batchTx.UnsafeDeleteBucket(bucket) + t.pendingDeleteOperations++ +} diff --git a/server/storage/backend/batch_tx_test.go b/server/storage/backend/batch_tx_test.go index 6fd2bbae6..cc099d1f9 100644 --- a/server/storage/backend/batch_tx_test.go +++ b/server/storage/backend/batch_tx_test.go @@ -19,6 +19,8 @@ import ( "testing" "time" + "github.com/google/go-cmp/cmp" + bolt "go.etcd.io/bbolt" "go.etcd.io/etcd/server/v3/storage/backend" betesting "go.etcd.io/etcd/server/v3/storage/backend/testing" @@ -205,3 +207,90 @@ func TestBatchTxBatchLimitCommit(t *testing.T) { return nil }) } + +func TestRangeAfterDeleteBucketMatch(t *testing.T) { + b, _ := betesting.NewTmpBackend(t, time.Hour, 10000) + defer betesting.Close(t, b) + + tx := b.BatchTx() + + tx.Lock() + tx.UnsafeCreateBucket(schema.Test) + tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar")) + tx.Unlock() + tx.Commit() + + checkForEach(t, b.BatchTx(), b.ReadTx(), [][]byte{[]byte("foo")}, [][]byte{[]byte("bar")}) + + tx.Lock() + tx.UnsafeDeleteBucket(schema.Test) + tx.Unlock() + + checkForEach(t, b.BatchTx(), b.ReadTx(), nil, nil) +} + +func TestRangeAfterDeleteMatch(t *testing.T) { + b, _ := betesting.NewTmpBackend(t, time.Hour, 10000) + defer betesting.Close(t, b) + + tx := b.BatchTx() + + tx.Lock() + tx.UnsafeCreateBucket(schema.Test) + tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar")) + tx.Unlock() + tx.Commit() + + checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), []byte("foo"), nil, 0) + checkForEach(t, b.BatchTx(), b.ReadTx(), [][]byte{[]byte("foo")}, [][]byte{[]byte("bar")}) + + tx.Lock() + tx.UnsafeDelete(schema.Test, []byte("foo")) + tx.Unlock() + + checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), []byte("foo"), nil, 0) + checkForEach(t, b.BatchTx(), b.ReadTx(), nil, nil) +} + +func checkRangeResponseMatch(t *testing.T, tx backend.BatchTx, rtx backend.ReadTx, key, endKey []byte, limit int64) { + tx.Lock() + ks1, vs1 := tx.UnsafeRange(schema.Test, key, endKey, limit) + tx.Unlock() + + rtx.RLock() + ks2, vs2 := rtx.UnsafeRange(schema.Test, key, endKey, limit) + rtx.RUnlock() + + if diff := cmp.Diff(ks1, ks2); diff != "" { + t.Errorf("keys on read and batch transaction doesn't match, diff: %s", diff) + } + if diff := cmp.Diff(vs1, vs2); diff != "" { + t.Errorf("values on read and batch transaction doesn't match, diff: %s", diff) + } +} + +func checkForEach(t *testing.T, tx backend.BatchTx, rtx backend.ReadTx, expectedKeys, expectedValues [][]byte) { + tx.Lock() + checkUnsafeForEach(t, tx, expectedKeys, expectedValues) + tx.Unlock() + + rtx.RLock() + checkUnsafeForEach(t, rtx, expectedKeys, expectedValues) + rtx.RUnlock() +} + +func checkUnsafeForEach(t *testing.T, tx backend.UnsafeReader, expectedKeys, expectedValues [][]byte) { + var ks, vs [][]byte + tx.UnsafeForEach(schema.Test, func(k, v []byte) error { + ks = append(ks, k) + vs = append(vs, v) + return nil + }) + + if diff := cmp.Diff(ks, expectedKeys); diff != "" { + t.Errorf("keys on transaction doesn't match expected, diff: %s", diff) + } + if diff := cmp.Diff(vs, expectedValues); diff != "" { + t.Errorf("values on transaction doesn't match expected, diff: %s", diff) + } +} diff --git a/tests/framework/integration/cluster.go b/tests/framework/integration/cluster.go index b34212e79..9d50321ca 100644 --- a/tests/framework/integration/cluster.go +++ b/tests/framework/integration/cluster.go @@ -142,7 +142,8 @@ type ClusterConfig struct { AuthToken string AuthTokenTTL uint - QuotaBackendBytes int64 + QuotaBackendBytes int64 + BackendBatchInterval time.Duration MaxTxnOps uint MaxRequestBytes uint @@ -271,6 +272,7 @@ func (c *Cluster) mustNewMember(t testutil.TB) *Member { PeerTLS: c.Cfg.PeerTLS, ClientTLS: c.Cfg.ClientTLS, QuotaBackendBytes: c.Cfg.QuotaBackendBytes, + BackendBatchInterval: c.Cfg.BackendBatchInterval, MaxTxnOps: c.Cfg.MaxTxnOps, MaxRequestBytes: c.Cfg.MaxRequestBytes, SnapshotCount: c.Cfg.SnapshotCount, @@ -598,6 +600,7 @@ type MemberConfig struct { AuthToken string AuthTokenTTL uint QuotaBackendBytes int64 + BackendBatchInterval time.Duration MaxTxnOps uint MaxRequestBytes uint SnapshotCount uint64 @@ -671,6 +674,7 @@ func MustNewMember(t testutil.TB, mcfg MemberConfig) *Member { m.TickMs = uint(framecfg.TickDuration / time.Millisecond) m.PreVote = true m.QuotaBackendBytes = mcfg.QuotaBackendBytes + m.BackendBatchInterval = mcfg.BackendBatchInterval m.MaxTxnOps = mcfg.MaxTxnOps if m.MaxTxnOps == 0 { m.MaxTxnOps = embed.DefaultMaxTxnOps diff --git a/tests/integration/clientv3/user_test.go b/tests/integration/clientv3/user_test.go index a6698d32e..49b4c18ea 100644 --- a/tests/integration/clientv3/user_test.go +++ b/tests/integration/clientv3/user_test.go @@ -19,6 +19,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" "google.golang.org/grpc" "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" @@ -55,6 +56,56 @@ func TestUserError(t *testing.T) { } } +func TestAddUserAfterDelete(t *testing.T) { + integration2.BeforeTest(t) + + clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + authapi := clus.RandClient() + authSetupRoot(t, authapi.Auth) + cfg := clientv3.Config{ + Endpoints: authapi.Endpoints(), + DialTimeout: 5 * time.Second, + DialOptions: []grpc.DialOption{grpc.WithBlock()}, + } + cfg.Username, cfg.Password = "root", "123" + authed, err := integration2.NewClient(t, cfg) + require.NoError(t, err) + defer authed.Close() + + // add user + _, err = authed.UserAdd(context.TODO(), "foo", "bar") + require.NoError(t, err) + _, err = authapi.Authenticate(context.TODO(), "foo", "bar") + require.NoError(t, err) + // delete user + _, err = authed.UserDelete(context.TODO(), "foo") + require.NoError(t, err) + if _, err = authed.Authenticate(context.TODO(), "foo", "bar"); err == nil { + t.Errorf("expect Authenticate error for old password") + } + // add user back + _, err = authed.UserAdd(context.TODO(), "foo", "bar") + require.NoError(t, err) + _, err = authed.Authenticate(context.TODO(), "foo", "bar") + require.NoError(t, err) + // change password + _, err = authed.UserChangePassword(context.TODO(), "foo", "bar2") + require.NoError(t, err) + _, err = authed.UserChangePassword(context.TODO(), "foo", "bar1") + require.NoError(t, err) + + if _, err = authed.Authenticate(context.TODO(), "foo", "bar"); err == nil { + t.Errorf("expect Authenticate error for old password") + } + if _, err = authed.Authenticate(context.TODO(), "foo", "bar2"); err == nil { + t.Errorf("expect Authenticate error for old password") + } + _, err = authed.Authenticate(context.TODO(), "foo", "bar1") + require.NoError(t, err) +} + func TestUserErrorAuth(t *testing.T) { integration2.BeforeTest(t) diff --git a/tests/integration/member_test.go b/tests/integration/member_test.go index 6581b654b..efd6598f6 100644 --- a/tests/integration/member_test.go +++ b/tests/integration/member_test.go @@ -18,9 +18,13 @@ import ( "context" "fmt" "testing" + "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.etcd.io/etcd/server/v3/etcdserver" + "go.etcd.io/etcd/server/v3/storage/schema" "go.etcd.io/etcd/tests/v3/framework/integration" ) @@ -115,3 +119,30 @@ func TestSnapshotAndRestartMember(t *testing.T) { } } } + +func TestRemoveMember(t *testing.T) { + integration.BeforeTest(t) + c := integration.NewCluster(t, &integration.ClusterConfig{Size: 3, UseBridge: true, BackendBatchInterval: 1000 * time.Second}) + defer c.Terminate(t) + // membership changes additionally require cluster to be stable for etcdserver.HealthInterval + time.Sleep(etcdserver.HealthInterval) + + err := c.RemoveMember(t, c.Client(2), uint64(c.Members[0].ID())) + require.NoError(t, err) + + checkMemberCount(t, c.Members[0], 2) + checkMemberCount(t, c.Members[1], 2) +} + +func checkMemberCount(t *testing.T, m *integration.Member, expectedMemberCount int) { + be := schema.NewMembershipBackend(m.Logger, m.Server.Backend()) + membersFromBackend, _ := be.MustReadMembersFromBackend() + if len(membersFromBackend) != expectedMemberCount { + t.Errorf("Expect member count read from backend=%d, got %d", expectedMemberCount, len(membersFromBackend)) + } + membersResp, err := m.Client.MemberList(context.Background()) + require.NoError(t, err) + if len(membersResp.Members) != expectedMemberCount { + t.Errorf("Expect len(MemberList)=%d, got %d", expectedMemberCount, len(membersResp.Members)) + } +}