Merge pull request #17238 from siyuanfoundation/txBuf-3.4

[3.4] Fix delete inconsistencies in read buffer
This commit is contained in:
Benjamin Wang 2024-01-14 18:50:35 +00:00 committed by GitHub
commit 6caef6ca48
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 149 additions and 2 deletions

View File

@ -19,6 +19,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
"go.etcd.io/etcd/integration" "go.etcd.io/etcd/integration"
@ -55,6 +56,56 @@ func TestUserError(t *testing.T) {
} }
} }
func TestAddUserAfterDelete(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)
authapi := clus.RandClient()
authSetupRoot(t, authapi.Auth)
cfg := clientv3.Config{
Endpoints: authapi.Endpoints(),
DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{grpc.WithBlock()},
}
cfg.Username, cfg.Password = "root", "123"
authed, err := clientv3.New(cfg)
require.NoError(t, err)
defer authed.Close()
// add user
_, err = authed.UserAdd(context.TODO(), "foo", "bar")
require.NoError(t, err)
_, err = authapi.Authenticate(context.TODO(), "foo", "bar")
require.NoError(t, err)
// delete user
_, err = authed.UserDelete(context.TODO(), "foo")
require.NoError(t, err)
if _, err = authed.Authenticate(context.TODO(), "foo", "bar"); err == nil {
t.Errorf("expect Authenticate error for old password")
}
// add user back
_, err = authed.UserAdd(context.TODO(), "foo", "bar")
require.NoError(t, err)
_, err = authed.Authenticate(context.TODO(), "foo", "bar")
require.NoError(t, err)
// change password
_, err = authed.UserChangePassword(context.TODO(), "foo", "bar2")
require.NoError(t, err)
_, err = authed.UserChangePassword(context.TODO(), "foo", "bar1")
require.NoError(t, err)
if _, err = authed.Authenticate(context.TODO(), "foo", "bar"); err == nil {
t.Errorf("expect Authenticate error for old password")
}
if _, err = authed.Authenticate(context.TODO(), "foo", "bar2"); err == nil {
t.Errorf("expect Authenticate error for old password")
}
_, err = authed.Authenticate(context.TODO(), "foo", "bar1")
require.NoError(t, err)
}
func TestUserErrorAuth(t *testing.T) { func TestUserErrorAuth(t *testing.T) {
defer testutil.AfterTest(t) defer testutil.AfterTest(t)

1
go.mod
View File

@ -15,6 +15,7 @@ require (
github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903 github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903
github.com/golang/protobuf v1.5.3 github.com/golang/protobuf v1.5.3
github.com/google/btree v1.0.0 github.com/google/btree v1.0.0
github.com/google/go-cmp v0.5.9
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4 github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/grpc-ecosystem/grpc-gateway v1.11.0 github.com/grpc-ecosystem/grpc-gateway v1.11.0

2
go.sum
View File

@ -84,6 +84,8 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=

View File

@ -259,6 +259,7 @@ func (t *batchTx) commit(stop bool) {
type batchTxBuffered struct { type batchTxBuffered struct {
batchTx batchTx
buf txWriteBuffer buf txWriteBuffer
pendingDeleteOperations int
} }
func newBatchTxBuffered(backend *backend) *batchTxBuffered { func newBatchTxBuffered(backend *backend) *batchTxBuffered {
@ -279,7 +280,27 @@ func (t *batchTxBuffered) Unlock() {
// gofail: var beforeWritebackBuf struct{} // gofail: var beforeWritebackBuf struct{}
t.buf.writeback(&t.backend.readTx.buf) t.buf.writeback(&t.backend.readTx.buf)
t.backend.readTx.Unlock() t.backend.readTx.Unlock()
if t.pending >= t.backend.batchLimit { // We commit the transaction when the number of pending operations
// reaches the configured limit(batchLimit) to prevent it from
// becoming excessively large.
//
// But we also need to commit the transaction immediately if there
// is any pending deleting operation, otherwise etcd might run into
// a situation that it haven't finished committing the data into backend
// storage (note: etcd periodically commits the bbolt transactions
// instead of on each request) when it applies next request. Accordingly,
// etcd may still read the stale data from bbolt when processing next
// request. So it breaks the linearizability.
//
// Note we don't need to commit the transaction for put requests if
// it doesn't exceed the batch limit, because there is a buffer on top
// of the bbolt. Each time when etcd reads data from backend storage,
// it will read data from both bbolt and the buffer. But there is no
// such a buffer for delete requests.
//
// Please also refer to
// https://github.com/etcd-io/etcd/pull/17119#issuecomment-1857547158
if t.pending >= t.backend.batchLimit || t.pendingDeleteOperations > 0 {
t.commit(false) t.commit(false)
} }
} }
@ -323,6 +344,7 @@ func (t *batchTxBuffered) unsafeCommit(stop bool) {
} }
t.batchTx.commit(stop) t.batchTx.commit(stop)
t.pendingDeleteOperations = 0
if !stop { if !stop {
t.backend.readTx.tx = t.backend.begin(false) t.backend.readTx.tx = t.backend.begin(false)
@ -338,3 +360,8 @@ func (t *batchTxBuffered) UnsafeSeqPut(bucketName []byte, key []byte, value []by
t.batchTx.UnsafeSeqPut(bucketName, key, value) t.batchTx.UnsafeSeqPut(bucketName, key, value)
t.buf.putSeq(bucketName, key, value) t.buf.putSeq(bucketName, key, value)
} }
func (t *batchTxBuffered) UnsafeDelete(bucketName []byte, key []byte) {
t.batchTx.UnsafeDelete(bucketName, key)
t.pendingDeleteOperations++
}

View File

@ -19,6 +19,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/google/go-cmp/cmp"
bolt "go.etcd.io/bbolt" bolt "go.etcd.io/bbolt"
) )
@ -195,3 +196,68 @@ func TestBatchTxBatchLimitCommit(t *testing.T) {
return nil return nil
}) })
} }
func TestRangeAfterDeleteMatch(t *testing.T) {
b, tmpPath := NewTmpBackend(time.Hour, 10000)
defer cleanup(b, tmpPath)
tx := b.BatchTx()
tx.Lock()
tx.UnsafeCreateBucket([]byte("test"))
tx.UnsafePut([]byte("test"), []byte("foo"), []byte("bar"))
tx.Unlock()
tx.Commit()
checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), []byte("foo"), nil, 0)
checkForEach(t, b.BatchTx(), b.ReadTx(), [][]byte{[]byte("foo")}, [][]byte{[]byte("bar")})
tx.Lock()
tx.UnsafeDelete([]byte("test"), []byte("foo"))
tx.Unlock()
checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), []byte("foo"), nil, 0)
checkForEach(t, b.BatchTx(), b.ReadTx(), nil, nil)
}
func checkRangeResponseMatch(t *testing.T, tx BatchTx, rtx ReadTx, key, endKey []byte, limit int64) {
tx.Lock()
ks1, vs1 := tx.UnsafeRange([]byte("test"), key, endKey, limit)
tx.Unlock()
rtx.RLock()
ks2, vs2 := rtx.UnsafeRange([]byte("test"), key, endKey, limit)
rtx.RUnlock()
if diff := cmp.Diff(ks1, ks2); diff != "" {
t.Errorf("keys on read and batch transaction doesn't match, diff: %s", diff)
}
if diff := cmp.Diff(vs1, vs2); diff != "" {
t.Errorf("values on read and batch transaction doesn't match, diff: %s", diff)
}
}
func checkForEach(t *testing.T, tx BatchTx, rtx ReadTx, expectedKeys, expectedValues [][]byte) {
tx.Lock()
checkUnsafeForEach(t, tx, expectedKeys, expectedValues)
tx.Unlock()
rtx.RLock()
checkUnsafeForEach(t, rtx, expectedKeys, expectedValues)
rtx.RUnlock()
}
func checkUnsafeForEach(t *testing.T, tx ReadTx, expectedKeys, expectedValues [][]byte) {
var ks, vs [][]byte
tx.UnsafeForEach([]byte("test"), func(k, v []byte) error {
ks = append(ks, k)
vs = append(vs, v)
return nil
})
if diff := cmp.Diff(ks, expectedKeys); diff != "" {
t.Errorf("keys on transaction doesn't match expected, diff: %s", diff)
}
if diff := cmp.Diff(vs, expectedValues); diff != "" {
t.Errorf("values on transaction doesn't match expected, diff: %s", diff)
}
}