mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #17195 from siyuanfoundation/txBuf1
Fix delete inconsistencies in read buffer
This commit is contained in:
commit
b3bf59a355
@ -11,6 +11,7 @@ require (
|
||||
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da
|
||||
github.com/golang/protobuf v1.5.3
|
||||
github.com/google/btree v1.1.2
|
||||
github.com/google/go-cmp v0.6.0
|
||||
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
|
||||
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
|
||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0
|
||||
|
@ -288,7 +288,8 @@ func (t *batchTx) commit(stop bool) {
|
||||
|
||||
type batchTxBuffered struct {
|
||||
batchTx
|
||||
buf txWriteBuffer
|
||||
buf txWriteBuffer
|
||||
pendingDeleteOperations int
|
||||
}
|
||||
|
||||
func newBatchTxBuffered(backend *backend) *batchTxBuffered {
|
||||
@ -310,7 +311,27 @@ func (t *batchTxBuffered) Unlock() {
|
||||
t.buf.writeback(&t.backend.readTx.buf)
|
||||
// gofail: var afterWritebackBuf struct{}
|
||||
t.backend.readTx.Unlock()
|
||||
if t.pending >= t.backend.batchLimit {
|
||||
// We commit the transaction when the number of pending operations
|
||||
// reaches the configured limit(batchLimit) to prevent it from
|
||||
// becoming excessively large.
|
||||
//
|
||||
// But we also need to commit the transaction immediately if there
|
||||
// is any pending deleting operation, otherwise etcd might run into
|
||||
// a situation that it haven't finished committing the data into backend
|
||||
// storage (note: etcd periodically commits the bbolt transactions
|
||||
// instead of on each request) when it applies next request. Accordingly,
|
||||
// etcd may still read the stale data from bbolt when processing next
|
||||
// request. So it breaks the linearizability.
|
||||
//
|
||||
// Note we don't need to commit the transaction for put requests if
|
||||
// it doesn't exceed the batch limit, because there is a buffer on top
|
||||
// of the bbolt. Each time when etcd reads data from backend storage,
|
||||
// it will read data from both bbolt and the buffer. But there is no
|
||||
// such a buffer for delete requests.
|
||||
//
|
||||
// Please also refer to
|
||||
// https://github.com/etcd-io/etcd/pull/17119#issuecomment-1857547158
|
||||
if t.pending >= t.backend.batchLimit || t.pendingDeleteOperations > 0 {
|
||||
t.commit(false)
|
||||
}
|
||||
}
|
||||
@ -356,6 +377,7 @@ func (t *batchTxBuffered) unsafeCommit(stop bool) {
|
||||
}
|
||||
|
||||
t.batchTx.commit(stop)
|
||||
t.pendingDeleteOperations = 0
|
||||
|
||||
if !stop {
|
||||
t.backend.readTx.tx = t.backend.begin(false)
|
||||
@ -371,3 +393,13 @@ func (t *batchTxBuffered) UnsafeSeqPut(bucket Bucket, key []byte, value []byte)
|
||||
t.batchTx.UnsafeSeqPut(bucket, key, value)
|
||||
t.buf.putSeq(bucket, key, value)
|
||||
}
|
||||
|
||||
func (t *batchTxBuffered) UnsafeDelete(bucketType Bucket, key []byte) {
|
||||
t.batchTx.UnsafeDelete(bucketType, key)
|
||||
t.pendingDeleteOperations++
|
||||
}
|
||||
|
||||
func (t *batchTxBuffered) UnsafeDeleteBucket(bucket Bucket) {
|
||||
t.batchTx.UnsafeDeleteBucket(bucket)
|
||||
t.pendingDeleteOperations++
|
||||
}
|
||||
|
@ -19,6 +19,8 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
|
||||
bolt "go.etcd.io/bbolt"
|
||||
"go.etcd.io/etcd/server/v3/storage/backend"
|
||||
betesting "go.etcd.io/etcd/server/v3/storage/backend/testing"
|
||||
@ -205,3 +207,90 @@ func TestBatchTxBatchLimitCommit(t *testing.T) {
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func TestRangeAfterDeleteBucketMatch(t *testing.T) {
|
||||
b, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
|
||||
defer betesting.Close(t, b)
|
||||
|
||||
tx := b.BatchTx()
|
||||
|
||||
tx.Lock()
|
||||
tx.UnsafeCreateBucket(schema.Test)
|
||||
tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar"))
|
||||
tx.Unlock()
|
||||
tx.Commit()
|
||||
|
||||
checkForEach(t, b.BatchTx(), b.ReadTx(), [][]byte{[]byte("foo")}, [][]byte{[]byte("bar")})
|
||||
|
||||
tx.Lock()
|
||||
tx.UnsafeDeleteBucket(schema.Test)
|
||||
tx.Unlock()
|
||||
|
||||
checkForEach(t, b.BatchTx(), b.ReadTx(), nil, nil)
|
||||
}
|
||||
|
||||
func TestRangeAfterDeleteMatch(t *testing.T) {
|
||||
b, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
|
||||
defer betesting.Close(t, b)
|
||||
|
||||
tx := b.BatchTx()
|
||||
|
||||
tx.Lock()
|
||||
tx.UnsafeCreateBucket(schema.Test)
|
||||
tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar"))
|
||||
tx.Unlock()
|
||||
tx.Commit()
|
||||
|
||||
checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), []byte("foo"), nil, 0)
|
||||
checkForEach(t, b.BatchTx(), b.ReadTx(), [][]byte{[]byte("foo")}, [][]byte{[]byte("bar")})
|
||||
|
||||
tx.Lock()
|
||||
tx.UnsafeDelete(schema.Test, []byte("foo"))
|
||||
tx.Unlock()
|
||||
|
||||
checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), []byte("foo"), nil, 0)
|
||||
checkForEach(t, b.BatchTx(), b.ReadTx(), nil, nil)
|
||||
}
|
||||
|
||||
func checkRangeResponseMatch(t *testing.T, tx backend.BatchTx, rtx backend.ReadTx, key, endKey []byte, limit int64) {
|
||||
tx.Lock()
|
||||
ks1, vs1 := tx.UnsafeRange(schema.Test, key, endKey, limit)
|
||||
tx.Unlock()
|
||||
|
||||
rtx.RLock()
|
||||
ks2, vs2 := rtx.UnsafeRange(schema.Test, key, endKey, limit)
|
||||
rtx.RUnlock()
|
||||
|
||||
if diff := cmp.Diff(ks1, ks2); diff != "" {
|
||||
t.Errorf("keys on read and batch transaction doesn't match, diff: %s", diff)
|
||||
}
|
||||
if diff := cmp.Diff(vs1, vs2); diff != "" {
|
||||
t.Errorf("values on read and batch transaction doesn't match, diff: %s", diff)
|
||||
}
|
||||
}
|
||||
|
||||
func checkForEach(t *testing.T, tx backend.BatchTx, rtx backend.ReadTx, expectedKeys, expectedValues [][]byte) {
|
||||
tx.Lock()
|
||||
checkUnsafeForEach(t, tx, expectedKeys, expectedValues)
|
||||
tx.Unlock()
|
||||
|
||||
rtx.RLock()
|
||||
checkUnsafeForEach(t, rtx, expectedKeys, expectedValues)
|
||||
rtx.RUnlock()
|
||||
}
|
||||
|
||||
func checkUnsafeForEach(t *testing.T, tx backend.UnsafeReader, expectedKeys, expectedValues [][]byte) {
|
||||
var ks, vs [][]byte
|
||||
tx.UnsafeForEach(schema.Test, func(k, v []byte) error {
|
||||
ks = append(ks, k)
|
||||
vs = append(vs, v)
|
||||
return nil
|
||||
})
|
||||
|
||||
if diff := cmp.Diff(ks, expectedKeys); diff != "" {
|
||||
t.Errorf("keys on transaction doesn't match expected, diff: %s", diff)
|
||||
}
|
||||
if diff := cmp.Diff(vs, expectedValues); diff != "" {
|
||||
t.Errorf("values on transaction doesn't match expected, diff: %s", diff)
|
||||
}
|
||||
}
|
||||
|
@ -142,7 +142,8 @@ type ClusterConfig struct {
|
||||
AuthToken string
|
||||
AuthTokenTTL uint
|
||||
|
||||
QuotaBackendBytes int64
|
||||
QuotaBackendBytes int64
|
||||
BackendBatchInterval time.Duration
|
||||
|
||||
MaxTxnOps uint
|
||||
MaxRequestBytes uint
|
||||
@ -271,6 +272,7 @@ func (c *Cluster) mustNewMember(t testutil.TB) *Member {
|
||||
PeerTLS: c.Cfg.PeerTLS,
|
||||
ClientTLS: c.Cfg.ClientTLS,
|
||||
QuotaBackendBytes: c.Cfg.QuotaBackendBytes,
|
||||
BackendBatchInterval: c.Cfg.BackendBatchInterval,
|
||||
MaxTxnOps: c.Cfg.MaxTxnOps,
|
||||
MaxRequestBytes: c.Cfg.MaxRequestBytes,
|
||||
SnapshotCount: c.Cfg.SnapshotCount,
|
||||
@ -598,6 +600,7 @@ type MemberConfig struct {
|
||||
AuthToken string
|
||||
AuthTokenTTL uint
|
||||
QuotaBackendBytes int64
|
||||
BackendBatchInterval time.Duration
|
||||
MaxTxnOps uint
|
||||
MaxRequestBytes uint
|
||||
SnapshotCount uint64
|
||||
@ -671,6 +674,7 @@ func MustNewMember(t testutil.TB, mcfg MemberConfig) *Member {
|
||||
m.TickMs = uint(framecfg.TickDuration / time.Millisecond)
|
||||
m.PreVote = true
|
||||
m.QuotaBackendBytes = mcfg.QuotaBackendBytes
|
||||
m.BackendBatchInterval = mcfg.BackendBatchInterval
|
||||
m.MaxTxnOps = mcfg.MaxTxnOps
|
||||
if m.MaxTxnOps == 0 {
|
||||
m.MaxTxnOps = embed.DefaultMaxTxnOps
|
||||
|
@ -19,6 +19,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
|
||||
@ -55,6 +56,56 @@ func TestUserError(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddUserAfterDelete(t *testing.T) {
|
||||
integration2.BeforeTest(t)
|
||||
|
||||
clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 1})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
authapi := clus.RandClient()
|
||||
authSetupRoot(t, authapi.Auth)
|
||||
cfg := clientv3.Config{
|
||||
Endpoints: authapi.Endpoints(),
|
||||
DialTimeout: 5 * time.Second,
|
||||
DialOptions: []grpc.DialOption{grpc.WithBlock()},
|
||||
}
|
||||
cfg.Username, cfg.Password = "root", "123"
|
||||
authed, err := integration2.NewClient(t, cfg)
|
||||
require.NoError(t, err)
|
||||
defer authed.Close()
|
||||
|
||||
// add user
|
||||
_, err = authed.UserAdd(context.TODO(), "foo", "bar")
|
||||
require.NoError(t, err)
|
||||
_, err = authapi.Authenticate(context.TODO(), "foo", "bar")
|
||||
require.NoError(t, err)
|
||||
// delete user
|
||||
_, err = authed.UserDelete(context.TODO(), "foo")
|
||||
require.NoError(t, err)
|
||||
if _, err = authed.Authenticate(context.TODO(), "foo", "bar"); err == nil {
|
||||
t.Errorf("expect Authenticate error for old password")
|
||||
}
|
||||
// add user back
|
||||
_, err = authed.UserAdd(context.TODO(), "foo", "bar")
|
||||
require.NoError(t, err)
|
||||
_, err = authed.Authenticate(context.TODO(), "foo", "bar")
|
||||
require.NoError(t, err)
|
||||
// change password
|
||||
_, err = authed.UserChangePassword(context.TODO(), "foo", "bar2")
|
||||
require.NoError(t, err)
|
||||
_, err = authed.UserChangePassword(context.TODO(), "foo", "bar1")
|
||||
require.NoError(t, err)
|
||||
|
||||
if _, err = authed.Authenticate(context.TODO(), "foo", "bar"); err == nil {
|
||||
t.Errorf("expect Authenticate error for old password")
|
||||
}
|
||||
if _, err = authed.Authenticate(context.TODO(), "foo", "bar2"); err == nil {
|
||||
t.Errorf("expect Authenticate error for old password")
|
||||
}
|
||||
_, err = authed.Authenticate(context.TODO(), "foo", "bar1")
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestUserErrorAuth(t *testing.T) {
|
||||
integration2.BeforeTest(t)
|
||||
|
||||
|
@ -18,9 +18,13 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"go.etcd.io/etcd/server/v3/etcdserver"
|
||||
"go.etcd.io/etcd/server/v3/storage/schema"
|
||||
"go.etcd.io/etcd/tests/v3/framework/integration"
|
||||
)
|
||||
|
||||
@ -115,3 +119,30 @@ func TestSnapshotAndRestartMember(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestRemoveMember(t *testing.T) {
|
||||
integration.BeforeTest(t)
|
||||
c := integration.NewCluster(t, &integration.ClusterConfig{Size: 3, UseBridge: true, BackendBatchInterval: 1000 * time.Second})
|
||||
defer c.Terminate(t)
|
||||
// membership changes additionally require cluster to be stable for etcdserver.HealthInterval
|
||||
time.Sleep(etcdserver.HealthInterval)
|
||||
|
||||
err := c.RemoveMember(t, c.Client(2), uint64(c.Members[0].ID()))
|
||||
require.NoError(t, err)
|
||||
|
||||
checkMemberCount(t, c.Members[0], 2)
|
||||
checkMemberCount(t, c.Members[1], 2)
|
||||
}
|
||||
|
||||
func checkMemberCount(t *testing.T, m *integration.Member, expectedMemberCount int) {
|
||||
be := schema.NewMembershipBackend(m.Logger, m.Server.Backend())
|
||||
membersFromBackend, _ := be.MustReadMembersFromBackend()
|
||||
if len(membersFromBackend) != expectedMemberCount {
|
||||
t.Errorf("Expect member count read from backend=%d, got %d", expectedMemberCount, len(membersFromBackend))
|
||||
}
|
||||
membersResp, err := m.Client.MemberList(context.Background())
|
||||
require.NoError(t, err)
|
||||
if len(membersResp.Members) != expectedMemberCount {
|
||||
t.Errorf("Expect len(MemberList)=%d, got %d", expectedMemberCount, len(membersResp.Members))
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user