mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Separate Writer interface from BatchTx interfaces
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
parent
3b708df1e0
commit
53cbd81009
@ -353,7 +353,7 @@ func (s *v3Manager) modifyLatestRevision(bumpAmount uint64) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *v3Manager) unsafeBumpRevision(tx backend.BatchTx, latest revision, amount int64) revision {
|
||||
func (s *v3Manager) unsafeBumpRevision(tx backend.UnsafeWriter, latest revision, amount int64) revision {
|
||||
s.lg.Info(
|
||||
"bumping latest revision",
|
||||
zap.Int64("latest-revision", latest.main),
|
||||
@ -370,7 +370,7 @@ func (s *v3Manager) unsafeBumpRevision(tx backend.BatchTx, latest revision, amou
|
||||
return latest
|
||||
}
|
||||
|
||||
func (s *v3Manager) unsafeMarkRevisionCompacted(tx backend.BatchTx, latest revision) {
|
||||
func (s *v3Manager) unsafeMarkRevisionCompacted(tx backend.UnsafeWriter, latest revision) {
|
||||
s.lg.Info(
|
||||
"marking revision compacted",
|
||||
zap.Int64("revision", latest.main),
|
||||
@ -379,7 +379,7 @@ func (s *v3Manager) unsafeMarkRevisionCompacted(tx backend.BatchTx, latest revis
|
||||
mvcc.UnsafeSetScheduledCompact(tx, latest.main)
|
||||
}
|
||||
|
||||
func (s *v3Manager) unsafeGetLatestRevision(tx backend.BatchTx) (revision, error) {
|
||||
func (s *v3Manager) unsafeGetLatestRevision(tx backend.UnsafeReader) (revision, error) {
|
||||
var latest revision
|
||||
err := tx.UnsafeForEach(schema.Key, func(k, _ []byte) (err error) {
|
||||
rev := bytesToRev(k)
|
||||
|
@ -205,18 +205,6 @@ type AuthBackend interface {
|
||||
GetAllRoles() []*authpb.Role
|
||||
}
|
||||
|
||||
type AuthBatchTx interface {
|
||||
Lock()
|
||||
Unlock()
|
||||
UnsafeAuthReader
|
||||
UnsafeSaveAuthEnabled(enabled bool)
|
||||
UnsafeSaveAuthRevision(rev uint64)
|
||||
UnsafePutUser(*authpb.User)
|
||||
UnsafeDeleteUser(string)
|
||||
UnsafePutRole(*authpb.Role)
|
||||
UnsafeDeleteRole(string)
|
||||
}
|
||||
|
||||
type AuthReadTx interface {
|
||||
RLock()
|
||||
RUnlock()
|
||||
@ -232,6 +220,26 @@ type UnsafeAuthReader interface {
|
||||
UnsafeGetAllRoles() []*authpb.Role
|
||||
}
|
||||
|
||||
type AuthBatchTx interface {
|
||||
Lock()
|
||||
Unlock()
|
||||
UnsafeAuthReadWriter
|
||||
}
|
||||
|
||||
type UnsafeAuthReadWriter interface {
|
||||
UnsafeAuthReader
|
||||
UnsafeAuthWriter
|
||||
}
|
||||
|
||||
type UnsafeAuthWriter interface {
|
||||
UnsafeSaveAuthEnabled(enabled bool)
|
||||
UnsafeSaveAuthRevision(rev uint64)
|
||||
UnsafePutUser(*authpb.User)
|
||||
UnsafeDeleteUser(string)
|
||||
UnsafePutRole(*authpb.Role)
|
||||
UnsafeDeleteRole(string)
|
||||
}
|
||||
|
||||
type authStore struct {
|
||||
// atomic operations; need 64-bit align, or 32-bit tests will crash
|
||||
revision uint64
|
||||
@ -990,7 +998,7 @@ func hasRootRole(u *authpb.User) bool {
|
||||
return idx != len(u.Roles) && u.Roles[idx] == rootRole
|
||||
}
|
||||
|
||||
func (as *authStore) commitRevision(tx AuthBatchTx) {
|
||||
func (as *authStore) commitRevision(tx UnsafeAuthWriter) {
|
||||
atomic.AddUint64(&as.revision, 1)
|
||||
tx.UnsafeSaveAuthRevision(as.Revision())
|
||||
}
|
||||
|
@ -46,7 +46,7 @@ type ConsistentIndexer interface {
|
||||
|
||||
// UnsafeSave must be called holding the lock on the tx.
|
||||
// It saves consistentIndex to the underlying stable storage.
|
||||
UnsafeSave(tx backend.BatchTx)
|
||||
UnsafeSave(tx backend.UnsafeReadWriter)
|
||||
|
||||
// SetBackend set the available backend.BatchTx for ConsistentIndexer.
|
||||
SetBackend(be Backend)
|
||||
@ -115,7 +115,7 @@ func (ci *consistentIndex) SetConsistentIndex(v uint64, term uint64) {
|
||||
atomic.StoreUint64(&ci.term, term)
|
||||
}
|
||||
|
||||
func (ci *consistentIndex) UnsafeSave(tx backend.BatchTx) {
|
||||
func (ci *consistentIndex) UnsafeSave(tx backend.UnsafeReadWriter) {
|
||||
index := atomic.LoadUint64(&ci.consistentIndex)
|
||||
term := atomic.LoadUint64(&ci.term)
|
||||
schema.UnsafeUpdateConsistentIndex(tx, index, term)
|
||||
@ -166,8 +166,8 @@ func (f *fakeConsistentIndex) SetConsistentApplyingIndex(index uint64, term uint
|
||||
atomic.StoreUint64(&f.term, term)
|
||||
}
|
||||
|
||||
func (f *fakeConsistentIndex) UnsafeSave(_ backend.BatchTx) {}
|
||||
func (f *fakeConsistentIndex) SetBackend(_ Backend) {}
|
||||
func (f *fakeConsistentIndex) UnsafeSave(_ backend.UnsafeReadWriter) {}
|
||||
func (f *fakeConsistentIndex) SetBackend(_ Backend) {}
|
||||
|
||||
func UpdateConsistentIndexForce(tx backend.BatchTx, index uint64, term uint64) {
|
||||
tx.LockOutsideApply()
|
||||
|
@ -46,18 +46,26 @@ type Bucket interface {
|
||||
type BatchTx interface {
|
||||
Lock()
|
||||
Unlock()
|
||||
UnsafeReader
|
||||
UnsafeCreateBucket(bucket Bucket)
|
||||
UnsafeDeleteBucket(bucket Bucket)
|
||||
UnsafePut(bucket Bucket, key []byte, value []byte)
|
||||
UnsafeSeqPut(bucket Bucket, key []byte, value []byte)
|
||||
UnsafeDelete(bucket Bucket, key []byte)
|
||||
// Commit commits a previous tx and begins a new writable one.
|
||||
Commit()
|
||||
// CommitAndStop commits the previous tx and does not create a new one.
|
||||
CommitAndStop()
|
||||
LockInsideApply()
|
||||
LockOutsideApply()
|
||||
UnsafeReadWriter
|
||||
}
|
||||
|
||||
type UnsafeReadWriter interface {
|
||||
UnsafeReader
|
||||
UnsafeWriter
|
||||
}
|
||||
|
||||
type UnsafeWriter interface {
|
||||
UnsafeCreateBucket(bucket Bucket)
|
||||
UnsafeDeleteBucket(bucket Bucket)
|
||||
UnsafePut(bucket Bucket, key []byte, value []byte)
|
||||
UnsafeSeqPut(bucket Bucket, key []byte, value []byte)
|
||||
UnsafeDelete(bucket Bucket, key []byte)
|
||||
}
|
||||
|
||||
type batchTx struct {
|
||||
|
@ -14,20 +14,20 @@
|
||||
|
||||
package backend
|
||||
|
||||
type HookFunc func(tx BatchTx)
|
||||
type HookFunc func(tx UnsafeReadWriter)
|
||||
|
||||
// Hooks allow to add additional logic executed during transaction lifetime.
|
||||
type Hooks interface {
|
||||
// OnPreCommitUnsafe is executed before Commit of transactions.
|
||||
// The given transaction is already locked.
|
||||
OnPreCommitUnsafe(tx BatchTx)
|
||||
OnPreCommitUnsafe(tx UnsafeReadWriter)
|
||||
}
|
||||
|
||||
type hooks struct {
|
||||
onPreCommitUnsafe HookFunc
|
||||
}
|
||||
|
||||
func (h hooks) OnPreCommitUnsafe(tx BatchTx) {
|
||||
func (h hooks) OnPreCommitUnsafe(tx UnsafeReadWriter) {
|
||||
h.onPreCommitUnsafe(tx)
|
||||
}
|
||||
|
||||
|
@ -113,7 +113,7 @@ func prepareBuckenAndKey(tx backend.BatchTx) {
|
||||
|
||||
func newTestHooksBackend(t testing.TB, baseConfig backend.BackendConfig) backend.Backend {
|
||||
cfg := baseConfig
|
||||
cfg.Hooks = backend.NewHooks(func(tx backend.BatchTx) {
|
||||
cfg.Hooks = backend.NewHooks(func(tx backend.UnsafeReadWriter) {
|
||||
k, v := tx.UnsafeRange(bucket, key, nil, 1)
|
||||
t.Logf("OnPreCommit executed: %v %v", string(k[0]), string(v[0]))
|
||||
assert.Len(t, k, 1)
|
||||
|
@ -41,7 +41,7 @@ func NewBackendHooks(lg *zap.Logger, indexer cindex.ConsistentIndexer) *BackendH
|
||||
return &BackendHooks{lg: lg, indexer: indexer}
|
||||
}
|
||||
|
||||
func (bh *BackendHooks) OnPreCommitUnsafe(tx backend.BatchTx) {
|
||||
func (bh *BackendHooks) OnPreCommitUnsafe(tx backend.UnsafeReadWriter) {
|
||||
bh.indexer.UnsafeSave(tx)
|
||||
bh.confStateLock.Lock()
|
||||
defer bh.confStateLock.Unlock()
|
||||
|
@ -41,7 +41,7 @@ func SetScheduledCompact(tx backend.BatchTx, value int64) {
|
||||
UnsafeSetScheduledCompact(tx, value)
|
||||
}
|
||||
|
||||
func UnsafeSetScheduledCompact(tx backend.BatchTx, value int64) {
|
||||
func UnsafeSetScheduledCompact(tx backend.UnsafeWriter, value int64) {
|
||||
rbytes := newRevBytes()
|
||||
revToBytes(revision{main: value}, rbytes)
|
||||
tx.UnsafePut(schema.Meta, schema.ScheduledCompactKeyName, rbytes)
|
||||
@ -53,7 +53,7 @@ func SetFinishedCompact(tx backend.BatchTx, value int64) {
|
||||
UnsafeSetFinishedCompact(tx, value)
|
||||
}
|
||||
|
||||
func UnsafeSetFinishedCompact(tx backend.BatchTx, value int64) {
|
||||
func UnsafeSetFinishedCompact(tx backend.UnsafeWriter, value int64) {
|
||||
rbytes := newRevBytes()
|
||||
revToBytes(revision{main: value}, rbytes)
|
||||
tx.UnsafePut(schema.Meta, schema.FinishedCompactKeyName, rbytes)
|
||||
|
@ -23,7 +23,7 @@ import (
|
||||
type action interface {
|
||||
// unsafeDo executes the action and returns revert action, when executed
|
||||
// should restore the state from before.
|
||||
unsafeDo(tx backend.BatchTx) (revert action, err error)
|
||||
unsafeDo(tx backend.UnsafeReadWriter) (revert action, err error)
|
||||
}
|
||||
|
||||
type setKeyAction struct {
|
||||
@ -32,7 +32,7 @@ type setKeyAction struct {
|
||||
FieldValue []byte
|
||||
}
|
||||
|
||||
func (a setKeyAction) unsafeDo(tx backend.BatchTx) (action, error) {
|
||||
func (a setKeyAction) unsafeDo(tx backend.UnsafeReadWriter) (action, error) {
|
||||
revert := restoreFieldValueAction(tx, a.Bucket, a.FieldName)
|
||||
tx.UnsafePut(a.Bucket, a.FieldName, a.FieldValue)
|
||||
return revert, nil
|
||||
@ -43,13 +43,13 @@ type deleteKeyAction struct {
|
||||
FieldName []byte
|
||||
}
|
||||
|
||||
func (a deleteKeyAction) unsafeDo(tx backend.BatchTx) (action, error) {
|
||||
func (a deleteKeyAction) unsafeDo(tx backend.UnsafeReadWriter) (action, error) {
|
||||
revert := restoreFieldValueAction(tx, a.Bucket, a.FieldName)
|
||||
tx.UnsafeDelete(a.Bucket, a.FieldName)
|
||||
return revert, nil
|
||||
}
|
||||
|
||||
func restoreFieldValueAction(tx backend.BatchTx, bucket backend.Bucket, fieldName []byte) action {
|
||||
func restoreFieldValueAction(tx backend.UnsafeReader, bucket backend.Bucket, fieldName []byte) action {
|
||||
_, vs := tx.UnsafeRange(bucket, fieldName, nil, 1)
|
||||
if len(vs) == 1 {
|
||||
return &setKeyAction{
|
||||
@ -68,7 +68,7 @@ type ActionList []action
|
||||
|
||||
// unsafeExecute executes actions one by one. If one of actions returns error,
|
||||
// it will revert them.
|
||||
func (as ActionList) unsafeExecute(lg *zap.Logger, tx backend.BatchTx) error {
|
||||
func (as ActionList) unsafeExecute(lg *zap.Logger, tx backend.UnsafeReadWriter) error {
|
||||
var revertActions = make(ActionList, 0, len(as))
|
||||
for _, a := range as {
|
||||
revert, err := a.unsafeDo(tx)
|
||||
@ -84,7 +84,7 @@ func (as ActionList) unsafeExecute(lg *zap.Logger, tx backend.BatchTx) error {
|
||||
|
||||
// unsafeExecuteInReversedOrder executes actions in revered order. Will panic on
|
||||
// action error. Should be used when reverting.
|
||||
func (as ActionList) unsafeExecuteInReversedOrder(lg *zap.Logger, tx backend.BatchTx) {
|
||||
func (as ActionList) unsafeExecuteInReversedOrder(lg *zap.Logger, tx backend.UnsafeReadWriter) {
|
||||
for j := len(as) - 1; j >= 0; j-- {
|
||||
_, err := as[j].unsafeDo(tx)
|
||||
if err != nil {
|
||||
|
@ -147,17 +147,17 @@ type brokenAction struct{}
|
||||
|
||||
var errBrokenAction = fmt.Errorf("broken action error")
|
||||
|
||||
func (c brokenAction) unsafeDo(tx backend.BatchTx) (action, error) {
|
||||
func (c brokenAction) unsafeDo(tx backend.UnsafeReadWriter) (action, error) {
|
||||
return nil, errBrokenAction
|
||||
}
|
||||
|
||||
func putKeyValues(tx backend.BatchTx, bucket backend.Bucket, kvs map[string]string) {
|
||||
func putKeyValues(tx backend.UnsafeWriter, bucket backend.Bucket, kvs map[string]string) {
|
||||
for k, v := range kvs {
|
||||
tx.UnsafePut(bucket, []byte(k), []byte(v))
|
||||
}
|
||||
}
|
||||
|
||||
func assertBucketState(t *testing.T, tx backend.BatchTx, bucket backend.Bucket, expect map[string]string) {
|
||||
func assertBucketState(t *testing.T, tx backend.UnsafeReadWriter, bucket backend.Bucket, expect map[string]string) {
|
||||
t.Helper()
|
||||
got := map[string]string{}
|
||||
ks, vs := tx.UnsafeRange(bucket, []byte("\x00"), []byte("\xff"), 0)
|
||||
|
@ -47,7 +47,7 @@ func (s *alarmBackend) MustPutAlarm(alarm *etcdserverpb.AlarmMember) {
|
||||
s.mustUnsafePutAlarm(tx, alarm)
|
||||
}
|
||||
|
||||
func (s *alarmBackend) mustUnsafePutAlarm(tx backend.BatchTx, alarm *etcdserverpb.AlarmMember) {
|
||||
func (s *alarmBackend) mustUnsafePutAlarm(tx backend.UnsafeWriter, alarm *etcdserverpb.AlarmMember) {
|
||||
v, err := alarm.Marshal()
|
||||
if err != nil {
|
||||
s.lg.Panic("failed to marshal alarm member", zap.Error(err))
|
||||
@ -63,7 +63,7 @@ func (s *alarmBackend) MustDeleteAlarm(alarm *etcdserverpb.AlarmMember) {
|
||||
s.mustUnsafeDeleteAlarm(tx, alarm)
|
||||
}
|
||||
|
||||
func (s *alarmBackend) mustUnsafeDeleteAlarm(tx backend.BatchTx, alarm *etcdserverpb.AlarmMember) {
|
||||
func (s *alarmBackend) mustUnsafeDeleteAlarm(tx backend.UnsafeWriter, alarm *etcdserverpb.AlarmMember) {
|
||||
v, err := alarm.Marshal()
|
||||
if err != nil {
|
||||
s.lg.Panic("failed to marshal alarm member", zap.Error(err))
|
||||
|
@ -21,7 +21,7 @@ import (
|
||||
"go.etcd.io/etcd/server/v3/storage/backend"
|
||||
)
|
||||
|
||||
func UnsafeCreateAuthRolesBucket(tx backend.BatchTx) {
|
||||
func UnsafeCreateAuthRolesBucket(tx backend.UnsafeWriter) {
|
||||
tx.UnsafeCreateBucket(AuthRoles)
|
||||
}
|
||||
|
||||
|
@ -30,17 +30,17 @@ import (
|
||||
func TestGetAllRoles(t *testing.T) {
|
||||
tcs := []struct {
|
||||
name string
|
||||
setup func(tx auth.AuthBatchTx)
|
||||
setup func(tx auth.UnsafeAuthWriter)
|
||||
want []*authpb.Role
|
||||
}{
|
||||
{
|
||||
name: "Empty by default",
|
||||
setup: func(tx auth.AuthBatchTx) {},
|
||||
setup: func(tx auth.UnsafeAuthWriter) {},
|
||||
want: nil,
|
||||
},
|
||||
{
|
||||
name: "Returns data put before",
|
||||
setup: func(tx auth.AuthBatchTx) {
|
||||
setup: func(tx auth.UnsafeAuthWriter) {
|
||||
tx.UnsafePutRole(&authpb.Role{
|
||||
Name: []byte("readKey"),
|
||||
KeyPermission: []*authpb.Permission{
|
||||
@ -67,7 +67,7 @@ func TestGetAllRoles(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "Skips deleted",
|
||||
setup: func(tx auth.AuthBatchTx) {
|
||||
setup: func(tx auth.UnsafeAuthWriter) {
|
||||
tx.UnsafePutRole(&authpb.Role{
|
||||
Name: []byte("role1"),
|
||||
})
|
||||
@ -80,7 +80,7 @@ func TestGetAllRoles(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "Returns data overriden by put",
|
||||
setup: func(tx auth.AuthBatchTx) {
|
||||
setup: func(tx auth.UnsafeAuthWriter) {
|
||||
tx.UnsafePutRole(&authpb.Role{
|
||||
Name: []byte("role1"),
|
||||
KeyPermission: []*authpb.Permission{
|
||||
@ -135,17 +135,17 @@ func TestGetAllRoles(t *testing.T) {
|
||||
func TestGetRole(t *testing.T) {
|
||||
tcs := []struct {
|
||||
name string
|
||||
setup func(tx auth.AuthBatchTx)
|
||||
setup func(tx auth.UnsafeAuthWriter)
|
||||
want *authpb.Role
|
||||
}{
|
||||
{
|
||||
name: "Returns nil for missing",
|
||||
setup: func(tx auth.AuthBatchTx) {},
|
||||
setup: func(tx auth.UnsafeAuthWriter) {},
|
||||
want: nil,
|
||||
},
|
||||
{
|
||||
name: "Returns data put before",
|
||||
setup: func(tx auth.AuthBatchTx) {
|
||||
setup: func(tx auth.UnsafeAuthWriter) {
|
||||
tx.UnsafePutRole(&authpb.Role{
|
||||
Name: []byte("role1"),
|
||||
KeyPermission: []*authpb.Permission{
|
||||
@ -170,7 +170,7 @@ func TestGetRole(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "Return nil for deleted",
|
||||
setup: func(tx auth.AuthBatchTx) {
|
||||
setup: func(tx auth.UnsafeAuthWriter) {
|
||||
tx.UnsafePutRole(&authpb.Role{
|
||||
Name: []byte("role1"),
|
||||
})
|
||||
@ -180,7 +180,7 @@ func TestGetRole(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "Returns data overriden by put",
|
||||
setup: func(tx auth.AuthBatchTx) {
|
||||
setup: func(tx auth.UnsafeAuthWriter) {
|
||||
tx.UnsafePutRole(&authpb.Role{
|
||||
Name: []byte("role1"),
|
||||
KeyPermission: []*authpb.Permission{
|
||||
|
@ -30,17 +30,17 @@ import (
|
||||
func TestGetAllUsers(t *testing.T) {
|
||||
tcs := []struct {
|
||||
name string
|
||||
setup func(tx auth.AuthBatchTx)
|
||||
setup func(tx auth.UnsafeAuthWriter)
|
||||
want []*authpb.User
|
||||
}{
|
||||
{
|
||||
name: "Empty by default",
|
||||
setup: func(tx auth.AuthBatchTx) {},
|
||||
setup: func(tx auth.UnsafeAuthWriter) {},
|
||||
want: nil,
|
||||
},
|
||||
{
|
||||
name: "Returns user put before",
|
||||
setup: func(tx auth.AuthBatchTx) {
|
||||
setup: func(tx auth.UnsafeAuthWriter) {
|
||||
tx.UnsafePutUser(&authpb.User{
|
||||
Name: []byte("alice"),
|
||||
Password: []byte("alicePassword"),
|
||||
@ -63,7 +63,7 @@ func TestGetAllUsers(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "Skips deleted user",
|
||||
setup: func(tx auth.AuthBatchTx) {
|
||||
setup: func(tx auth.UnsafeAuthWriter) {
|
||||
tx.UnsafePutUser(&authpb.User{
|
||||
Name: []byte("alice"),
|
||||
})
|
||||
@ -76,7 +76,7 @@ func TestGetAllUsers(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "Returns data overriden by put",
|
||||
setup: func(tx auth.AuthBatchTx) {
|
||||
setup: func(tx auth.UnsafeAuthWriter) {
|
||||
tx.UnsafePutUser(&authpb.User{
|
||||
Name: []byte("alice"),
|
||||
Password: []byte("oldPassword"),
|
||||
@ -123,17 +123,17 @@ func TestGetAllUsers(t *testing.T) {
|
||||
func TestGetUser(t *testing.T) {
|
||||
tcs := []struct {
|
||||
name string
|
||||
setup func(tx auth.AuthBatchTx)
|
||||
setup func(tx auth.UnsafeAuthWriter)
|
||||
want *authpb.User
|
||||
}{
|
||||
{
|
||||
name: "Returns nil for missing user",
|
||||
setup: func(tx auth.AuthBatchTx) {},
|
||||
setup: func(tx auth.UnsafeAuthWriter) {},
|
||||
want: nil,
|
||||
},
|
||||
{
|
||||
name: "Returns data put before",
|
||||
setup: func(tx auth.AuthBatchTx) {
|
||||
setup: func(tx auth.UnsafeAuthWriter) {
|
||||
tx.UnsafePutUser(&authpb.User{
|
||||
Name: []byte("alice"),
|
||||
Password: []byte("alicePassword"),
|
||||
@ -154,7 +154,7 @@ func TestGetUser(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "Skips deleted",
|
||||
setup: func(tx auth.AuthBatchTx) {
|
||||
setup: func(tx auth.UnsafeAuthWriter) {
|
||||
tx.UnsafePutUser(&authpb.User{
|
||||
Name: []byte("alice"),
|
||||
})
|
||||
@ -164,7 +164,7 @@ func TestGetUser(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "Returns data overriden by put",
|
||||
setup: func(tx auth.AuthBatchTx) {
|
||||
setup: func(tx auth.UnsafeAuthWriter) {
|
||||
tx.UnsafePutUser(&authpb.User{
|
||||
Name: []byte("alice"),
|
||||
Password: []byte("oldPassword"),
|
||||
|
@ -23,7 +23,7 @@ import (
|
||||
)
|
||||
|
||||
// UnsafeCreateMetaBucket creates the `meta` bucket (if it does not exist yet).
|
||||
func UnsafeCreateMetaBucket(tx backend.BatchTx) {
|
||||
func UnsafeCreateMetaBucket(tx backend.UnsafeWriter) {
|
||||
tx.UnsafeCreateBucket(Meta)
|
||||
}
|
||||
|
||||
@ -59,15 +59,15 @@ func ReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) {
|
||||
return UnsafeReadConsistentIndex(tx)
|
||||
}
|
||||
|
||||
func UnsafeUpdateConsistentIndexForce(tx backend.BatchTx, index uint64, term uint64) {
|
||||
func UnsafeUpdateConsistentIndexForce(tx backend.UnsafeReadWriter, index uint64, term uint64) {
|
||||
unsafeUpdateConsistentIndex(tx, index, term, true)
|
||||
}
|
||||
|
||||
func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64) {
|
||||
func UnsafeUpdateConsistentIndex(tx backend.UnsafeReadWriter, index uint64, term uint64) {
|
||||
unsafeUpdateConsistentIndex(tx, index, term, false)
|
||||
}
|
||||
|
||||
func unsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, allowDecreasing bool) {
|
||||
func unsafeUpdateConsistentIndex(tx backend.UnsafeReadWriter, index uint64, term uint64, allowDecreasing bool) {
|
||||
if index == 0 {
|
||||
// Never save 0 as it means that we didn't load the real index yet.
|
||||
return
|
||||
|
@ -26,7 +26,7 @@ import (
|
||||
|
||||
// MustUnsafeSaveConfStateToBackend persists confState using given transaction (tx).
|
||||
// confState in backend is persisted since etcd v3.5.
|
||||
func MustUnsafeSaveConfStateToBackend(lg *zap.Logger, tx backend.BatchTx, confState *raftpb.ConfState) {
|
||||
func MustUnsafeSaveConfStateToBackend(lg *zap.Logger, tx backend.UnsafeWriter, confState *raftpb.ConfState) {
|
||||
confStateBytes, err := json.Marshal(confState)
|
||||
if err != nil {
|
||||
lg.Panic("Cannot marshal raftpb.ConfState", zap.Stringer("conf-state", confState), zap.Error(err))
|
||||
|
@ -22,7 +22,7 @@ import (
|
||||
"go.etcd.io/etcd/server/v3/storage/backend"
|
||||
)
|
||||
|
||||
func UnsafeCreateLeaseBucket(tx backend.BatchTx) {
|
||||
func UnsafeCreateLeaseBucket(tx backend.UnsafeWriter) {
|
||||
tx.UnsafeCreateBucket(Lease)
|
||||
}
|
||||
|
||||
@ -43,7 +43,7 @@ func MustUnsafeGetAllLeases(tx backend.UnsafeReader) []*leasepb.Lease {
|
||||
return ls
|
||||
}
|
||||
|
||||
func MustUnsafePutLease(tx backend.BatchTx, lpb *leasepb.Lease) {
|
||||
func MustUnsafePutLease(tx backend.UnsafeWriter, lpb *leasepb.Lease) {
|
||||
key := leaseIdToBytes(lpb.ID)
|
||||
|
||||
val, err := lpb.Marshal()
|
||||
@ -53,11 +53,11 @@ func MustUnsafePutLease(tx backend.BatchTx, lpb *leasepb.Lease) {
|
||||
tx.UnsafePut(Lease, key, val)
|
||||
}
|
||||
|
||||
func UnsafeDeleteLease(tx backend.BatchTx, lpb *leasepb.Lease) {
|
||||
func UnsafeDeleteLease(tx backend.UnsafeWriter, lpb *leasepb.Lease) {
|
||||
tx.UnsafeDelete(Lease, leaseIdToBytes(lpb.ID))
|
||||
}
|
||||
|
||||
func MustUnsafeGetLease(tx backend.BatchTx, leaseID int64) *leasepb.Lease {
|
||||
func MustUnsafeGetLease(tx backend.UnsafeReader, leaseID int64) *leasepb.Lease {
|
||||
_, vs := tx.UnsafeRange(Lease, leaseIdToBytes(leaseID), nil, 0)
|
||||
if len(vs) != 1 {
|
||||
return nil
|
||||
|
@ -30,17 +30,17 @@ import (
|
||||
func TestLeaseBackend(t *testing.T) {
|
||||
tcs := []struct {
|
||||
name string
|
||||
setup func(tx backend.BatchTx)
|
||||
setup func(tx backend.UnsafeWriter)
|
||||
want []*leasepb.Lease
|
||||
}{
|
||||
{
|
||||
name: "Empty by default",
|
||||
setup: func(tx backend.BatchTx) {},
|
||||
setup: func(tx backend.UnsafeWriter) {},
|
||||
want: []*leasepb.Lease{},
|
||||
},
|
||||
{
|
||||
name: "Returns data put before",
|
||||
setup: func(tx backend.BatchTx) {
|
||||
setup: func(tx backend.UnsafeWriter) {
|
||||
MustUnsafePutLease(tx, &leasepb.Lease{
|
||||
ID: -1,
|
||||
TTL: 2,
|
||||
@ -55,7 +55,7 @@ func TestLeaseBackend(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "Skips deleted",
|
||||
setup: func(tx backend.BatchTx) {
|
||||
setup: func(tx backend.UnsafeWriter) {
|
||||
MustUnsafePutLease(tx, &leasepb.Lease{
|
||||
ID: -1,
|
||||
TTL: 2,
|
||||
|
@ -56,7 +56,7 @@ func (p migrationPlan) Execute(lg *zap.Logger, tx backend.BatchTx) error {
|
||||
return p.unsafeExecute(lg, tx)
|
||||
}
|
||||
|
||||
func (p migrationPlan) unsafeExecute(lg *zap.Logger, tx backend.BatchTx) (err error) {
|
||||
func (p migrationPlan) unsafeExecute(lg *zap.Logger, tx backend.UnsafeReadWriter) (err error) {
|
||||
for _, s := range p {
|
||||
err = s.unsafeExecute(lg, tx)
|
||||
if err != nil {
|
||||
@ -98,7 +98,7 @@ func (s migrationStep) execute(lg *zap.Logger, tx backend.BatchTx) error {
|
||||
}
|
||||
|
||||
// unsafeExecute is non thread-safe version of execute.
|
||||
func (s migrationStep) unsafeExecute(lg *zap.Logger, tx backend.BatchTx) error {
|
||||
func (s migrationStep) unsafeExecute(lg *zap.Logger, tx backend.UnsafeReadWriter) error {
|
||||
err := s.actions.unsafeExecute(lg, tx)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -221,7 +221,7 @@ type actionMock struct {
|
||||
err error
|
||||
}
|
||||
|
||||
func (a actionMock) unsafeDo(tx backend.BatchTx) (action, error) {
|
||||
func (a actionMock) unsafeDo(tx backend.UnsafeReadWriter) (action, error) {
|
||||
a.recorder.actions = append(a.recorder.actions, a.name)
|
||||
return actionMock{
|
||||
recorder: a.recorder,
|
||||
|
@ -62,7 +62,7 @@ func Migrate(lg *zap.Logger, tx backend.BatchTx, w WALVersion, target semver.Ver
|
||||
}
|
||||
|
||||
// UnsafeMigrate is non thread-safe version of Migrate.
|
||||
func UnsafeMigrate(lg *zap.Logger, tx backend.BatchTx, w WALVersion, target semver.Version) error {
|
||||
func UnsafeMigrate(lg *zap.Logger, tx backend.UnsafeReadWriter, w WALVersion, target semver.Version) error {
|
||||
current, err := UnsafeDetectSchemaVersion(lg, tx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot detect storage schema version: %v", err)
|
||||
|
@ -37,7 +37,7 @@ func TestValidate(t *testing.T) {
|
||||
name string
|
||||
version semver.Version
|
||||
// Overrides which keys should be set (default based on version)
|
||||
overrideKeys func(tx backend.BatchTx)
|
||||
overrideKeys func(tx backend.UnsafeReadWriter)
|
||||
expectError bool
|
||||
expectErrorMsg string
|
||||
}{
|
||||
@ -50,19 +50,19 @@ func TestValidate(t *testing.T) {
|
||||
{
|
||||
name: `V3.5 schema without confstate and term fields is correct`,
|
||||
version: version.V3_5,
|
||||
overrideKeys: func(tx backend.BatchTx) {},
|
||||
overrideKeys: func(tx backend.UnsafeReadWriter) {},
|
||||
},
|
||||
{
|
||||
name: `V3.5 schema without term field is correct`,
|
||||
version: version.V3_5,
|
||||
overrideKeys: func(tx backend.BatchTx) {
|
||||
overrideKeys: func(tx backend.UnsafeReadWriter) {
|
||||
MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{})
|
||||
},
|
||||
},
|
||||
{
|
||||
name: `V3.5 schema with all fields is correct`,
|
||||
version: version.V3_5,
|
||||
overrideKeys: func(tx backend.BatchTx) {
|
||||
overrideKeys: func(tx backend.UnsafeReadWriter) {
|
||||
MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{})
|
||||
UnsafeUpdateConsistentIndex(tx, 1, 1)
|
||||
},
|
||||
@ -101,7 +101,7 @@ func TestMigrate(t *testing.T) {
|
||||
name string
|
||||
version semver.Version
|
||||
// Overrides which keys should be set (default based on version)
|
||||
overrideKeys func(tx backend.BatchTx)
|
||||
overrideKeys func(tx backend.UnsafeReadWriter)
|
||||
targetVersion semver.Version
|
||||
walEntries []etcdserverpb.InternalRaftRequest
|
||||
|
||||
@ -114,7 +114,7 @@ func TestMigrate(t *testing.T) {
|
||||
{
|
||||
name: `Upgrading v3.5 to v3.6 should be rejected if confstate is not set`,
|
||||
version: version.V3_5,
|
||||
overrideKeys: func(tx backend.BatchTx) {},
|
||||
overrideKeys: func(tx backend.UnsafeReadWriter) {},
|
||||
targetVersion: version.V3_6,
|
||||
expectVersion: nil,
|
||||
expectError: true,
|
||||
@ -123,7 +123,7 @@ func TestMigrate(t *testing.T) {
|
||||
{
|
||||
name: `Upgrading v3.5 to v3.6 should be rejected if term is not set`,
|
||||
version: version.V3_5,
|
||||
overrideKeys: func(tx backend.BatchTx) {
|
||||
overrideKeys: func(tx backend.UnsafeReadWriter) {
|
||||
MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{})
|
||||
},
|
||||
targetVersion: version.V3_6,
|
||||
@ -294,7 +294,7 @@ func TestMigrateIsReversible(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func setupBackendData(t *testing.T, ver semver.Version, overrideKeys func(tx backend.BatchTx)) string {
|
||||
func setupBackendData(t *testing.T, ver semver.Version, overrideKeys func(tx backend.UnsafeReadWriter)) string {
|
||||
t.Helper()
|
||||
be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10)
|
||||
tx := be.BatchTx()
|
||||
|
@ -56,12 +56,12 @@ func ReadStorageVersionFromSnapshot(tx *bbolt.Tx) *semver.Version {
|
||||
|
||||
// UnsafeSetStorageVersion updates etcd storage version in backend.
|
||||
// Populated since v3.6
|
||||
func UnsafeSetStorageVersion(tx backend.BatchTx, v *semver.Version) {
|
||||
func UnsafeSetStorageVersion(tx backend.UnsafeWriter, v *semver.Version) {
|
||||
sv := semver.Version{Major: v.Major, Minor: v.Minor}
|
||||
tx.UnsafePut(Meta, MetaStorageVersionName, []byte(sv.String()))
|
||||
}
|
||||
|
||||
// UnsafeClearStorageVersion removes etcd storage version in backend.
|
||||
func UnsafeClearStorageVersion(tx backend.BatchTx) {
|
||||
func UnsafeClearStorageVersion(tx backend.UnsafeWriter) {
|
||||
tx.UnsafeDelete(Meta, MetaStorageVersionName)
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user