mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #13854 from ahrtr/data_corruption
Fix the data inconsistency issue by moving the SetConsistentIndex into the transaction lock
This commit is contained in:
commit
c83b1ad9ba
@ -322,7 +322,7 @@ func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, term uint64, desir
|
||||
|
||||
if !v3 {
|
||||
tx := be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockOutsideApply()
|
||||
defer tx.Unlock()
|
||||
schema.UnsafeCreateMetaBucket(tx)
|
||||
schema.UnsafeUpdateConsistentIndex(tx, idx, term, false)
|
||||
|
@ -118,7 +118,7 @@ type migrateConfig struct {
|
||||
func migrateCommandFunc(c *migrateConfig) error {
|
||||
defer c.be.Close()
|
||||
tx := c.be.BatchTx()
|
||||
current, err := schema.DetectSchemaVersion(c.lg, tx)
|
||||
current, err := schema.DetectSchemaVersion(c.lg, c.be.ReadTx())
|
||||
if err != nil {
|
||||
c.lg.Error("failed to detect storage version. Please make sure you are using data dir from etcd v3.5 and older")
|
||||
return err
|
||||
@ -140,7 +140,7 @@ func migrateCommandFunc(c *migrateConfig) error {
|
||||
}
|
||||
|
||||
func migrateForce(lg *zap.Logger, tx backend.BatchTx, target *semver.Version) {
|
||||
tx.Lock()
|
||||
tx.LockOutsideApply()
|
||||
defer tx.Unlock()
|
||||
// Storage version is only supported since v3.6
|
||||
if target.LessThan(schema.V3_6) {
|
||||
|
@ -20,7 +20,7 @@ import (
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func getMergedPerms(tx AuthBatchTx, userName string) *unifiedRangePermissions {
|
||||
func getMergedPerms(tx AuthReadTx, userName string) *unifiedRangePermissions {
|
||||
user := tx.UnsafeGetUser(userName)
|
||||
if user == nil {
|
||||
return nil
|
||||
@ -103,7 +103,7 @@ func checkKeyPoint(lg *zap.Logger, cachedPerms *unifiedRangePermissions, key []b
|
||||
return false
|
||||
}
|
||||
|
||||
func (as *authStore) isRangeOpPermitted(tx AuthBatchTx, userName string, key, rangeEnd []byte, permtyp authpb.Permission_Type) bool {
|
||||
func (as *authStore) isRangeOpPermitted(tx AuthReadTx, userName string, key, rangeEnd []byte, permtyp authpb.Permission_Type) bool {
|
||||
// assumption: tx is Lock()ed
|
||||
_, ok := as.rangePermCache[userName]
|
||||
if !ok {
|
||||
|
@ -196,6 +196,7 @@ type TokenProvider interface {
|
||||
type AuthBackend interface {
|
||||
CreateAuthBuckets()
|
||||
ForceCommit()
|
||||
ReadTx() AuthReadTx
|
||||
BatchTx() AuthBatchTx
|
||||
|
||||
GetUser(string) *authpb.User
|
||||
@ -345,7 +346,7 @@ func (as *authStore) CheckPassword(username, password string) (uint64, error) {
|
||||
// CompareHashAndPassword is very expensive, so we use closures
|
||||
// to avoid putting it in the critical section of the tx lock.
|
||||
revision, err := func() (uint64, error) {
|
||||
tx := as.be.BatchTx()
|
||||
tx := as.be.ReadTx()
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
|
||||
@ -373,7 +374,7 @@ func (as *authStore) CheckPassword(username, password string) (uint64, error) {
|
||||
|
||||
func (as *authStore) Recover(be AuthBackend) {
|
||||
as.be = be
|
||||
tx := be.BatchTx()
|
||||
tx := be.ReadTx()
|
||||
tx.Lock()
|
||||
|
||||
enabled := tx.UnsafeReadAuthEnabled()
|
||||
@ -855,7 +856,7 @@ func (as *authStore) isOpPermitted(userName string, revision uint64, key, rangeE
|
||||
return ErrAuthOldRevision
|
||||
}
|
||||
|
||||
tx := as.be.BatchTx()
|
||||
tx := as.be.ReadTx()
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
|
||||
@ -897,7 +898,10 @@ func (as *authStore) IsAdminPermitted(authInfo *AuthInfo) error {
|
||||
return ErrUserEmpty
|
||||
}
|
||||
|
||||
u := as.be.GetUser(authInfo.Username)
|
||||
tx := as.be.ReadTx()
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
u := tx.UnsafeGetUser(authInfo.Username)
|
||||
|
||||
if u == nil {
|
||||
return ErrUserNotFound
|
||||
@ -935,6 +939,8 @@ func NewAuthStore(lg *zap.Logger, be AuthBackend, tp TokenProvider, bcryptCost i
|
||||
|
||||
be.CreateAuthBuckets()
|
||||
tx := be.BatchTx()
|
||||
// We should call LockOutsideApply here, but the txPostLockHoos isn't set
|
||||
// to EtcdServer yet, so it's OK.
|
||||
tx.Lock()
|
||||
enabled := tx.UnsafeReadAuthEnabled()
|
||||
as := &authStore{
|
||||
|
@ -36,6 +36,10 @@ func (b *backendMock) CreateAuthBuckets() {
|
||||
func (b *backendMock) ForceCommit() {
|
||||
}
|
||||
|
||||
func (b *backendMock) ReadTx() AuthReadTx {
|
||||
return &txMock{be: b}
|
||||
}
|
||||
|
||||
func (b *backendMock) BatchTx() AuthBatchTx {
|
||||
return &txMock{be: b}
|
||||
}
|
||||
|
@ -78,9 +78,9 @@ func (s *serverVersionAdapter) GetStorageVersion() *semver.Version {
|
||||
s.bemu.RLock()
|
||||
defer s.bemu.RUnlock()
|
||||
|
||||
tx := s.be.BatchTx()
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
tx := s.be.ReadTx()
|
||||
tx.RLock()
|
||||
defer tx.RUnlock()
|
||||
v, err := schema.UnsafeDetectSchemaVersion(s.lg, tx)
|
||||
if err != nil {
|
||||
return nil
|
||||
@ -94,7 +94,7 @@ func (s *serverVersionAdapter) UpdateStorageVersion(target semver.Version) error
|
||||
defer s.bemu.RUnlock()
|
||||
|
||||
tx := s.be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockOutsideApply()
|
||||
defer tx.Unlock()
|
||||
return schema.UnsafeMigrate(s.lg, tx, s.r.storage, target)
|
||||
}
|
||||
|
@ -231,7 +231,7 @@ func bootstrapBackend(cfg config.ServerConfig, haveWAL bool, st v2store.Store, s
|
||||
}
|
||||
}
|
||||
if beExist {
|
||||
err = schema.Validate(cfg.Logger, be.BatchTx())
|
||||
err = schema.Validate(cfg.Logger, be.ReadTx())
|
||||
if err != nil {
|
||||
cfg.Logger.Error("Failed to validate schema", zap.Error(err))
|
||||
return nil, err
|
||||
|
@ -23,6 +23,7 @@ import (
|
||||
)
|
||||
|
||||
type Backend interface {
|
||||
ReadTx() backend.ReadTx
|
||||
BatchTx() backend.BatchTx
|
||||
}
|
||||
|
||||
@ -32,9 +33,18 @@ type ConsistentIndexer interface {
|
||||
// ConsistentIndex returns the consistent index of current executing entry.
|
||||
ConsistentIndex() uint64
|
||||
|
||||
// ConsistentApplyingIndex returns the consistent applying index of current executing entry.
|
||||
ConsistentApplyingIndex() (uint64, uint64)
|
||||
|
||||
// UnsafeConsistentIndex is similar to ConsistentIndex, but it doesn't lock the transaction.
|
||||
UnsafeConsistentIndex() uint64
|
||||
|
||||
// SetConsistentIndex set the consistent index of current executing entry.
|
||||
SetConsistentIndex(v uint64, term uint64)
|
||||
|
||||
// SetConsistentApplyingIndex set the consistent applying index of current executing entry.
|
||||
SetConsistentApplyingIndex(v uint64, term uint64)
|
||||
|
||||
// UnsafeSave must be called holding the lock on the tx.
|
||||
// It saves consistentIndex to the underlying stable storage.
|
||||
UnsafeSave(tx backend.BatchTx)
|
||||
@ -54,6 +64,19 @@ type consistentIndex struct {
|
||||
// The value is being persisted in the backend since v3.5.
|
||||
term uint64
|
||||
|
||||
// applyingIndex and applyingTerm are just temporary cache of the raftpb.Entry.Index
|
||||
// and raftpb.Entry.Term, and they are not ready to be persisted yet. They will be
|
||||
// saved to consistentIndex and term above in the txPostLockInsideApplyHook.
|
||||
//
|
||||
// TODO(ahrtr): try to remove the OnPreCommitUnsafe, and compare the
|
||||
// performance difference. Afterwards we can make a decision on whether
|
||||
// or not we should remove OnPreCommitUnsafe. If it is true, then we
|
||||
// can remove applyingIndex and applyingTerm, and save the e.Index and
|
||||
// e.Term to consistentIndex and term directly in applyEntries, and
|
||||
// persist them into db in the txPostLockInsideApplyHook.
|
||||
applyingIndex uint64
|
||||
applyingTerm uint64
|
||||
|
||||
// be is used for initial read consistentIndex
|
||||
be Backend
|
||||
// mutex is protecting be.
|
||||
@ -73,7 +96,17 @@ func (ci *consistentIndex) ConsistentIndex() uint64 {
|
||||
ci.mutex.Lock()
|
||||
defer ci.mutex.Unlock()
|
||||
|
||||
v, term := schema.ReadConsistentIndex(ci.be.BatchTx())
|
||||
v, term := schema.ReadConsistentIndex(ci.be.ReadTx())
|
||||
ci.SetConsistentIndex(v, term)
|
||||
return v
|
||||
}
|
||||
|
||||
func (ci *consistentIndex) UnsafeConsistentIndex() uint64 {
|
||||
if index := atomic.LoadUint64(&ci.consistentIndex); index > 0 {
|
||||
return index
|
||||
}
|
||||
|
||||
v, term := schema.UnsafeReadConsistentIndex(ci.be.ReadTx())
|
||||
ci.SetConsistentIndex(v, term)
|
||||
return v
|
||||
}
|
||||
@ -97,6 +130,15 @@ func (ci *consistentIndex) SetBackend(be Backend) {
|
||||
ci.SetConsistentIndex(0, 0)
|
||||
}
|
||||
|
||||
func (ci *consistentIndex) ConsistentApplyingIndex() (uint64, uint64) {
|
||||
return atomic.LoadUint64(&ci.applyingIndex), atomic.LoadUint64(&ci.applyingTerm)
|
||||
}
|
||||
|
||||
func (ci *consistentIndex) SetConsistentApplyingIndex(v uint64, term uint64) {
|
||||
atomic.StoreUint64(&ci.applyingIndex, v)
|
||||
atomic.StoreUint64(&ci.applyingTerm, term)
|
||||
}
|
||||
|
||||
func NewFakeConsistentIndex(index uint64) ConsistentIndexer {
|
||||
return &fakeConsistentIndex{index: index}
|
||||
}
|
||||
@ -106,18 +148,30 @@ type fakeConsistentIndex struct {
|
||||
term uint64
|
||||
}
|
||||
|
||||
func (f *fakeConsistentIndex) ConsistentIndex() uint64 { return f.index }
|
||||
func (f *fakeConsistentIndex) ConsistentIndex() uint64 {
|
||||
return atomic.LoadUint64(&f.index)
|
||||
}
|
||||
func (f *fakeConsistentIndex) ConsistentApplyingIndex() (uint64, uint64) {
|
||||
return atomic.LoadUint64(&f.index), atomic.LoadUint64(&f.term)
|
||||
}
|
||||
func (f *fakeConsistentIndex) UnsafeConsistentIndex() uint64 {
|
||||
return atomic.LoadUint64(&f.index)
|
||||
}
|
||||
|
||||
func (f *fakeConsistentIndex) SetConsistentIndex(index uint64, term uint64) {
|
||||
atomic.StoreUint64(&f.index, index)
|
||||
atomic.StoreUint64(&f.term, term)
|
||||
}
|
||||
func (f *fakeConsistentIndex) SetConsistentApplyingIndex(index uint64, term uint64) {
|
||||
atomic.StoreUint64(&f.index, index)
|
||||
atomic.StoreUint64(&f.term, term)
|
||||
}
|
||||
|
||||
func (f *fakeConsistentIndex) UnsafeSave(_ backend.BatchTx) {}
|
||||
func (f *fakeConsistentIndex) SetBackend(_ Backend) {}
|
||||
|
||||
func UpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, onlyGrow bool) {
|
||||
tx.Lock()
|
||||
tx.LockOutsideApply()
|
||||
defer tx.Unlock()
|
||||
schema.UnsafeUpdateConsistentIndex(tx, index, term, onlyGrow)
|
||||
}
|
||||
|
@ -401,6 +401,10 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
|
||||
})
|
||||
}
|
||||
|
||||
// Set the hook after EtcdServer finishes the initialization to avoid
|
||||
// the hook being called during the initialization process.
|
||||
srv.be.SetTxPostLockInsideApplyHook(srv.getTxPostLockInsideApplyHook())
|
||||
|
||||
// TODO: move transport initialization near the definition of remote
|
||||
tr := &rafthttp.Transport{
|
||||
Logger: cfg.Logger,
|
||||
@ -978,6 +982,8 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
|
||||
}
|
||||
|
||||
s.consistIndex.SetBackend(newbe)
|
||||
newbe.SetTxPostLockInsideApplyHook(s.getTxPostLockInsideApplyHook())
|
||||
|
||||
lg.Info("restored mvcc store", zap.Uint64("consistent-index", s.consistIndex.ConsistentIndex()))
|
||||
|
||||
// Closing old backend might block until all the txns
|
||||
@ -1771,7 +1777,7 @@ func (s *EtcdServer) apply(
|
||||
|
||||
// set the consistent index of current executing entry
|
||||
if e.Index > s.consistIndex.ConsistentIndex() {
|
||||
s.consistIndex.SetConsistentIndex(e.Index, e.Term)
|
||||
s.consistIndex.SetConsistentApplyingIndex(e.Index, e.Term)
|
||||
shouldApplyV3 = membership.ApplyBoth
|
||||
}
|
||||
|
||||
@ -1798,10 +1804,18 @@ func (s *EtcdServer) apply(
|
||||
// applyEntryNormal applies an EntryNormal type raftpb request to the EtcdServer
|
||||
func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
|
||||
shouldApplyV3 := membership.ApplyV2storeOnly
|
||||
applyV3Performed := false
|
||||
defer func() {
|
||||
// The txPostLock callback will not get called in this case,
|
||||
// so we should set the consistent index directly.
|
||||
if s.consistIndex != nil && !applyV3Performed && membership.ApplyBoth == shouldApplyV3 {
|
||||
s.consistIndex.SetConsistentIndex(e.Index, e.Term)
|
||||
}
|
||||
}()
|
||||
index := s.consistIndex.ConsistentIndex()
|
||||
if e.Index > index {
|
||||
// set the consistent index of current executing entry
|
||||
s.consistIndex.SetConsistentIndex(e.Index, e.Term)
|
||||
s.consistIndex.SetConsistentApplyingIndex(e.Index, e.Term)
|
||||
shouldApplyV3 = membership.ApplyBoth
|
||||
}
|
||||
s.lg.Debug("apply entry normal",
|
||||
@ -1853,6 +1867,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
|
||||
if !needResult && raftReq.Txn != nil {
|
||||
removeNeedlessRangeReqs(raftReq.Txn)
|
||||
}
|
||||
applyV3Performed = true
|
||||
ar = s.applyV3.Apply(&raftReq, shouldApplyV3)
|
||||
}
|
||||
|
||||
@ -1895,6 +1910,13 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
|
||||
if err := s.cluster.ValidateConfigurationChange(cc); err != nil {
|
||||
cc.NodeID = raft.None
|
||||
s.r.ApplyConfChange(cc)
|
||||
|
||||
// The txPostLock callback will not get called in this case,
|
||||
// so we should set the consistent index directly.
|
||||
if s.consistIndex != nil && membership.ApplyBoth == shouldApplyV3 {
|
||||
applyingIndex, applyingTerm := s.consistIndex.ConsistentApplyingIndex()
|
||||
s.consistIndex.SetConsistentIndex(applyingIndex, applyingTerm)
|
||||
}
|
||||
return false, err
|
||||
}
|
||||
|
||||
@ -2296,3 +2318,12 @@ func (s *EtcdServer) raftStatus() raft.Status {
|
||||
func (s *EtcdServer) Version() *serverversion.Manager {
|
||||
return serverversion.NewManager(s.Logger(), NewServerVersionAdapter(s))
|
||||
}
|
||||
|
||||
func (s *EtcdServer) getTxPostLockInsideApplyHook() func() {
|
||||
return func() {
|
||||
applyingIdx, applyingTerm := s.consistIndex.ConsistentApplyingIndex()
|
||||
if applyingIdx > s.consistIndex.UnsafeConsistentIndex() {
|
||||
s.consistIndex.SetConsistentIndex(applyingIdx, applyingTerm)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -687,9 +687,7 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {
|
||||
|
||||
_, appliedi, _ := srv.apply(ents, &raftpb.ConfState{})
|
||||
consistIndex := srv.consistIndex.ConsistentIndex()
|
||||
if consistIndex != appliedi {
|
||||
t.Fatalf("consistIndex = %v, want %v", consistIndex, appliedi)
|
||||
}
|
||||
assert.Equal(t, uint64(2), appliedi)
|
||||
|
||||
t.Run("verify-backend", func(t *testing.T) {
|
||||
tx := be.BatchTx()
|
||||
@ -698,9 +696,8 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {
|
||||
srv.beHooks.OnPreCommitUnsafe(tx)
|
||||
assert.Equal(t, raftpb.ConfState{Voters: []uint64{2}}, *schema.UnsafeConfStateFromBackend(lg, tx))
|
||||
})
|
||||
rindex, rterm := schema.ReadConsistentIndex(be.BatchTx())
|
||||
rindex, _ := schema.ReadConsistentIndex(be.ReadTx())
|
||||
assert.Equal(t, consistIndex, rindex)
|
||||
assert.Equal(t, uint64(4), rterm)
|
||||
}
|
||||
|
||||
func realisticRaftNode(lg *zap.Logger) *raftNode {
|
||||
|
@ -797,7 +797,7 @@ func (le *lessor) findDueScheduledCheckpoints(checkpointLimit int) []*pb.LeaseCh
|
||||
func (le *lessor) initAndRecover() {
|
||||
tx := le.b.BatchTx()
|
||||
|
||||
tx.Lock()
|
||||
tx.LockOutsideApply()
|
||||
schema.UnsafeCreateLeaseBucket(tx)
|
||||
lpbs := schema.MustUnsafeGetAllLeases(tx)
|
||||
tx.Unlock()
|
||||
@ -845,7 +845,7 @@ func (l *Lease) expired() bool {
|
||||
func (l *Lease) persistTo(b backend.Backend) {
|
||||
lpb := leasepb.Lease{ID: int64(l.ID), TTL: l.ttl, RemainingTTL: l.remainingTTL}
|
||||
tx := b.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
defer tx.Unlock()
|
||||
schema.MustUnsafePutLease(tx, &lpb)
|
||||
}
|
||||
|
@ -99,7 +99,7 @@ func OpenBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend {
|
||||
func RecoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot, beExist bool, hooks *BackendHooks) (backend.Backend, error) {
|
||||
consistentIndex := uint64(0)
|
||||
if beExist {
|
||||
consistentIndex, _ = schema.ReadConsistentIndex(oldbe.BatchTx())
|
||||
consistentIndex, _ = schema.ReadConsistentIndex(oldbe.ReadTx())
|
||||
}
|
||||
if snapshot.Metadata.Index <= consistentIndex {
|
||||
return oldbe, nil
|
||||
|
@ -67,6 +67,9 @@ type Backend interface {
|
||||
Defrag() error
|
||||
ForceCommit()
|
||||
Close() error
|
||||
|
||||
// SetTxPostLockInsideApplyHook sets a txPostLockInsideApplyHook.
|
||||
SetTxPostLockInsideApplyHook(func())
|
||||
}
|
||||
|
||||
type Snapshot interface {
|
||||
@ -119,6 +122,9 @@ type backend struct {
|
||||
|
||||
hooks Hooks
|
||||
|
||||
// txPostLockInsideApplyHook is called each time right after locking the tx.
|
||||
txPostLockInsideApplyHook func()
|
||||
|
||||
lg *zap.Logger
|
||||
}
|
||||
|
||||
@ -227,6 +233,14 @@ func (b *backend) BatchTx() BatchTx {
|
||||
return b.batchTx
|
||||
}
|
||||
|
||||
func (b *backend) SetTxPostLockInsideApplyHook(hook func()) {
|
||||
// It needs to lock the batchTx, because the periodic commit
|
||||
// may be accessing the txPostLockInsideApplyHook at the moment.
|
||||
b.batchTx.lock()
|
||||
defer b.batchTx.Unlock()
|
||||
b.txPostLockInsideApplyHook = hook
|
||||
}
|
||||
|
||||
func (b *backend) ReadTx() ReadTx { return b.readTx }
|
||||
|
||||
// ConcurrentReadTx creates and returns a new ReadTx, which:
|
||||
@ -438,7 +452,7 @@ func (b *backend) defrag() error {
|
||||
// TODO: make this non-blocking?
|
||||
// lock batchTx to ensure nobody is using previous tx, and then
|
||||
// close previous ongoing tx.
|
||||
b.batchTx.Lock()
|
||||
b.batchTx.LockOutsideApply()
|
||||
defer b.batchTx.Unlock()
|
||||
|
||||
// lock database after lock tx to avoid deadlock.
|
||||
|
@ -65,18 +65,32 @@ type batchTx struct {
|
||||
pending int
|
||||
}
|
||||
|
||||
// Lock is supposed to be called only by the unit test.
|
||||
func (t *batchTx) Lock() {
|
||||
ValidateCalledInsideUnittest(t.backend.lg)
|
||||
t.lock()
|
||||
}
|
||||
|
||||
func (t *batchTx) lock() {
|
||||
t.Mutex.Lock()
|
||||
}
|
||||
|
||||
func (t *batchTx) LockInsideApply() {
|
||||
ValidateCalledInsideApply(t.backend.lg)
|
||||
t.Lock()
|
||||
t.lock()
|
||||
if t.backend.txPostLockInsideApplyHook != nil {
|
||||
// The callers of some methods (i.e., (*RaftCluster).AddMember)
|
||||
// can be coming from both InsideApply and OutsideApply, but the
|
||||
// callers from OutsideApply will have a nil txPostLockInsideApplyHook.
|
||||
// So we should check the txPostLockInsideApplyHook before validating
|
||||
// the callstack.
|
||||
ValidateCalledInsideApply(t.backend.lg)
|
||||
t.backend.txPostLockInsideApplyHook()
|
||||
}
|
||||
}
|
||||
|
||||
func (t *batchTx) LockOutsideApply() {
|
||||
ValidateCalledOutSideApply(t.backend.lg)
|
||||
t.Lock()
|
||||
t.lock()
|
||||
}
|
||||
|
||||
func (t *batchTx) Unlock() {
|
||||
@ -226,14 +240,14 @@ func unsafeForEach(tx *bolt.Tx, bucket Bucket, visitor func(k, v []byte) error)
|
||||
|
||||
// Commit commits a previous tx and begins a new writable one.
|
||||
func (t *batchTx) Commit() {
|
||||
t.Lock()
|
||||
t.lock()
|
||||
t.commit(false)
|
||||
t.Unlock()
|
||||
}
|
||||
|
||||
// CommitAndStop commits the previous tx and does not create a new one.
|
||||
func (t *batchTx) CommitAndStop() {
|
||||
t.Lock()
|
||||
t.lock()
|
||||
t.commit(true)
|
||||
t.Unlock()
|
||||
}
|
||||
@ -303,13 +317,13 @@ func (t *batchTxBuffered) Unlock() {
|
||||
}
|
||||
|
||||
func (t *batchTxBuffered) Commit() {
|
||||
t.Lock()
|
||||
t.lock()
|
||||
t.commit(false)
|
||||
t.Unlock()
|
||||
}
|
||||
|
||||
func (t *batchTxBuffered) CommitAndStop() {
|
||||
t.Lock()
|
||||
t.lock()
|
||||
t.commit(true)
|
||||
t.Unlock()
|
||||
}
|
||||
|
@ -41,8 +41,6 @@ func TestBackendPreCommitHook(t *testing.T) {
|
||||
// Empty commit.
|
||||
tx.Commit()
|
||||
|
||||
write(tx, []byte("foo"), []byte("bar"))
|
||||
|
||||
assert.Equal(t, ">cc", getCommitsKey(t, be), "expected 2 explict commits")
|
||||
tx.Commit()
|
||||
assert.Equal(t, ">ccc", getCommitsKey(t, be), "expected 3 explict commits")
|
||||
|
@ -46,6 +46,15 @@ func ValidateCalledOutSideApply(lg *zap.Logger) {
|
||||
}
|
||||
}
|
||||
|
||||
func ValidateCalledInsideUnittest(lg *zap.Logger) {
|
||||
if !verifyLockEnabled() {
|
||||
return
|
||||
}
|
||||
if !insideUnittest() {
|
||||
lg.Fatal("Lock called outside of unit test!", zap.Stack("stacktrace"))
|
||||
}
|
||||
}
|
||||
|
||||
func verifyLockEnabled() bool {
|
||||
return os.Getenv(ENV_VERIFY) == ENV_VERIFY_ALL_VALUE || os.Getenv(ENV_VERIFY) == ENV_VERIFY_LOCK
|
||||
}
|
||||
@ -54,3 +63,8 @@ func insideApply() bool {
|
||||
stackTraceStr := string(debug.Stack())
|
||||
return strings.Contains(stackTraceStr, ".applyEntries")
|
||||
}
|
||||
|
||||
func insideUnittest() bool {
|
||||
stackTraceStr := string(debug.Stack())
|
||||
return strings.Contains(stackTraceStr, "_test.go") && !strings.Contains(stackTraceStr, "tests/")
|
||||
}
|
||||
|
@ -15,7 +15,6 @@
|
||||
package backend_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
@ -26,40 +25,60 @@ import (
|
||||
|
||||
func TestLockVerify(t *testing.T) {
|
||||
tcs := []struct {
|
||||
insideApply bool
|
||||
lock func(tx backend.BatchTx)
|
||||
expectPanic bool
|
||||
name string
|
||||
insideApply bool
|
||||
lock func(tx backend.BatchTx)
|
||||
txPostLockInsideApplyHook func()
|
||||
expectPanic bool
|
||||
}{
|
||||
{
|
||||
name: "call lockInsideApply from inside apply",
|
||||
insideApply: true,
|
||||
lock: lockInsideApply,
|
||||
expectPanic: false,
|
||||
},
|
||||
{
|
||||
name: "call lockInsideApply from outside apply (without txPostLockInsideApplyHook)",
|
||||
insideApply: false,
|
||||
lock: lockInsideApply,
|
||||
expectPanic: true,
|
||||
expectPanic: false,
|
||||
},
|
||||
{
|
||||
name: "call lockInsideApply from outside apply (with txPostLockInsideApplyHook)",
|
||||
insideApply: false,
|
||||
lock: lockInsideApply,
|
||||
txPostLockInsideApplyHook: func() {},
|
||||
expectPanic: true,
|
||||
},
|
||||
{
|
||||
name: "call lockOutsideApply from outside apply",
|
||||
insideApply: false,
|
||||
lock: lockOutsideApply,
|
||||
expectPanic: false,
|
||||
},
|
||||
{
|
||||
name: "call lockOutsideApply from inside apply",
|
||||
insideApply: true,
|
||||
lock: lockOutsideApply,
|
||||
expectPanic: true,
|
||||
},
|
||||
{
|
||||
name: "call Lock from unit test",
|
||||
insideApply: false,
|
||||
lock: lockFromUT,
|
||||
expectPanic: false,
|
||||
},
|
||||
}
|
||||
env := os.Getenv("ETCD_VERIFY")
|
||||
os.Setenv("ETCD_VERIFY", "lock")
|
||||
defer func() {
|
||||
os.Setenv("ETCD_VERIFY", env)
|
||||
}()
|
||||
for i, tc := range tcs {
|
||||
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
|
||||
for _, tc := range tcs {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
|
||||
be, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
|
||||
be.SetTxPostLockInsideApplyHook(tc.txPostLockInsideApplyHook)
|
||||
|
||||
hasPaniced := handlePanic(func() {
|
||||
if tc.insideApply {
|
||||
@ -89,3 +108,4 @@ func applyEntries(be backend.Backend, f func(tx backend.BatchTx)) {
|
||||
|
||||
func lockInsideApply(tx backend.BatchTx) { tx.LockInsideApply() }
|
||||
func lockOutsideApply(tx backend.BatchTx) { tx.LockOutsideApply() }
|
||||
func lockFromUT(tx backend.BatchTx) { tx.Lock() }
|
||||
|
@ -121,7 +121,7 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfi
|
||||
}
|
||||
|
||||
tx := s.b.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockOutsideApply()
|
||||
tx.UnsafeCreateBucket(schema.Key)
|
||||
schema.UnsafeCreateMetaBucket(tx)
|
||||
tx.Unlock()
|
||||
@ -331,7 +331,7 @@ func (s *store) restore() error {
|
||||
|
||||
// restore index
|
||||
tx := s.b.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockOutsideApply()
|
||||
|
||||
finishedCompact, found := UnsafeReadFinishedCompact(tx)
|
||||
if found {
|
||||
|
@ -42,7 +42,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc
|
||||
start := time.Now()
|
||||
|
||||
tx := s.b.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockOutsideApply()
|
||||
keys, _ := tx.UnsafeRange(schema.Key, last, end, int64(batchNum))
|
||||
for _, key := range keys {
|
||||
rev = bytesToRev(key)
|
||||
|
@ -881,6 +881,8 @@ type fakeBatchTx struct {
|
||||
rangeRespc chan rangeResp
|
||||
}
|
||||
|
||||
func (b *fakeBatchTx) LockInsideApply() {}
|
||||
func (b *fakeBatchTx) LockOutsideApply() {}
|
||||
func (b *fakeBatchTx) Lock() {}
|
||||
func (b *fakeBatchTx) Unlock() {}
|
||||
func (b *fakeBatchTx) RLock() {}
|
||||
@ -904,10 +906,8 @@ func (b *fakeBatchTx) UnsafeDelete(bucket backend.Bucket, key []byte) {
|
||||
func (b *fakeBatchTx) UnsafeForEach(bucket backend.Bucket, visitor func(k, v []byte) error) error {
|
||||
return nil
|
||||
}
|
||||
func (b *fakeBatchTx) Commit() {}
|
||||
func (b *fakeBatchTx) CommitAndStop() {}
|
||||
func (b *fakeBatchTx) LockInsideApply() {}
|
||||
func (b *fakeBatchTx) LockOutsideApply() {}
|
||||
func (b *fakeBatchTx) Commit() {}
|
||||
func (b *fakeBatchTx) CommitAndStop() {}
|
||||
|
||||
type fakeBackend struct {
|
||||
tx *fakeBatchTx
|
||||
@ -924,6 +924,7 @@ func (b *fakeBackend) Snapshot() backend.Snapshot
|
||||
func (b *fakeBackend) ForceCommit() {}
|
||||
func (b *fakeBackend) Defrag() error { return nil }
|
||||
func (b *fakeBackend) Close() error { return nil }
|
||||
func (b *fakeBackend) SetTxPostLockInsideApplyHook(func()) {}
|
||||
|
||||
type indexGetResp struct {
|
||||
rev revision
|
||||
|
@ -133,7 +133,7 @@ type storeTxnWrite struct {
|
||||
func (s *store) Write(trace *traceutil.Trace) TxnWrite {
|
||||
s.mu.RLock()
|
||||
tx := s.b.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
tw := &storeTxnWrite{
|
||||
storeTxnRead: storeTxnRead{s, tx, 0, 0, trace},
|
||||
tx: tx,
|
||||
|
@ -36,7 +36,7 @@ func UnsafeReadScheduledCompact(tx backend.ReadTx) (scheduledComact int64, found
|
||||
}
|
||||
|
||||
func SetScheduledCompact(tx backend.BatchTx, value int64) {
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
defer tx.Unlock()
|
||||
UnsafeSetScheduledCompact(tx, value)
|
||||
}
|
||||
@ -48,7 +48,7 @@ func UnsafeSetScheduledCompact(tx backend.BatchTx, value int64) {
|
||||
}
|
||||
|
||||
func SetFinishedCompact(tx backend.BatchTx, value int64) {
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
defer tx.Unlock()
|
||||
UnsafeSetFinishedCompact(tx, value)
|
||||
}
|
||||
|
@ -34,14 +34,14 @@ func NewAlarmBackend(lg *zap.Logger, be backend.Backend) *alarmBackend {
|
||||
|
||||
func (s *alarmBackend) CreateAlarmBucket() {
|
||||
tx := s.be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockOutsideApply()
|
||||
defer tx.Unlock()
|
||||
tx.UnsafeCreateBucket(Alarm)
|
||||
}
|
||||
|
||||
func (s *alarmBackend) MustPutAlarm(alarm *etcdserverpb.AlarmMember) {
|
||||
tx := s.be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
defer tx.Unlock()
|
||||
s.mustUnsafePutAlarm(tx, alarm)
|
||||
}
|
||||
@ -57,7 +57,7 @@ func (s *alarmBackend) mustUnsafePutAlarm(tx backend.BatchTx, alarm *etcdserverp
|
||||
|
||||
func (s *alarmBackend) MustDeleteAlarm(alarm *etcdserverpb.AlarmMember) {
|
||||
tx := s.be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
defer tx.Unlock()
|
||||
s.mustUnsafeDeleteAlarm(tx, alarm)
|
||||
}
|
||||
|
@ -49,7 +49,7 @@ func NewAuthBackend(lg *zap.Logger, be backend.Backend) *authBackend {
|
||||
|
||||
func (abe *authBackend) CreateAuthBuckets() {
|
||||
tx := abe.be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockOutsideApply()
|
||||
defer tx.Unlock()
|
||||
tx.UnsafeCreateBucket(Auth)
|
||||
tx.UnsafeCreateBucket(AuthUsers)
|
||||
@ -60,15 +60,25 @@ func (abe *authBackend) ForceCommit() {
|
||||
abe.be.ForceCommit()
|
||||
}
|
||||
|
||||
func (abe *authBackend) ReadTx() auth.AuthReadTx {
|
||||
return &authReadTx{tx: abe.be.ReadTx(), lg: abe.lg}
|
||||
}
|
||||
|
||||
func (abe *authBackend) BatchTx() auth.AuthBatchTx {
|
||||
return &authBatchTx{tx: abe.be.BatchTx(), lg: abe.lg}
|
||||
}
|
||||
|
||||
type authReadTx struct {
|
||||
tx backend.ReadTx
|
||||
lg *zap.Logger
|
||||
}
|
||||
|
||||
type authBatchTx struct {
|
||||
tx backend.BatchTx
|
||||
lg *zap.Logger
|
||||
}
|
||||
|
||||
var _ auth.AuthReadTx = (*authReadTx)(nil)
|
||||
var _ auth.AuthBatchTx = (*authBatchTx)(nil)
|
||||
|
||||
func (atx *authBatchTx) UnsafeSaveAuthEnabled(enabled bool) {
|
||||
@ -86,6 +96,24 @@ func (atx *authBatchTx) UnsafeSaveAuthRevision(rev uint64) {
|
||||
}
|
||||
|
||||
func (atx *authBatchTx) UnsafeReadAuthEnabled() bool {
|
||||
arx := &authReadTx{tx: atx.tx, lg: atx.lg}
|
||||
return arx.UnsafeReadAuthEnabled()
|
||||
}
|
||||
|
||||
func (atx *authBatchTx) UnsafeReadAuthRevision() uint64 {
|
||||
arx := &authReadTx{tx: atx.tx, lg: atx.lg}
|
||||
return arx.UnsafeReadAuthRevision()
|
||||
}
|
||||
|
||||
func (atx *authBatchTx) Lock() {
|
||||
atx.tx.LockInsideApply()
|
||||
}
|
||||
|
||||
func (atx *authBatchTx) Unlock() {
|
||||
atx.tx.Unlock()
|
||||
}
|
||||
|
||||
func (atx *authReadTx) UnsafeReadAuthEnabled() bool {
|
||||
_, vs := atx.tx.UnsafeRange(Auth, AuthEnabledKeyName, nil, 0)
|
||||
if len(vs) == 1 {
|
||||
if bytes.Equal(vs[0], authEnabled) {
|
||||
@ -95,7 +123,7 @@ func (atx *authBatchTx) UnsafeReadAuthEnabled() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (atx *authBatchTx) UnsafeReadAuthRevision() uint64 {
|
||||
func (atx *authReadTx) UnsafeReadAuthRevision() uint64 {
|
||||
_, vs := atx.tx.UnsafeRange(Auth, AuthRevisionKeyName, nil, 0)
|
||||
if len(vs) != 1 {
|
||||
// this can happen in the initialization phase
|
||||
@ -104,10 +132,10 @@ func (atx *authBatchTx) UnsafeReadAuthRevision() uint64 {
|
||||
return binary.BigEndian.Uint64(vs[0])
|
||||
}
|
||||
|
||||
func (atx *authBatchTx) Lock() {
|
||||
atx.tx.Lock()
|
||||
func (atx *authReadTx) Lock() {
|
||||
atx.tx.RLock()
|
||||
}
|
||||
|
||||
func (atx *authBatchTx) Unlock() {
|
||||
atx.tx.Unlock()
|
||||
func (atx *authReadTx) Unlock() {
|
||||
atx.tx.RUnlock()
|
||||
}
|
||||
|
@ -32,17 +32,8 @@ func (abe *authBackend) GetRole(roleName string) *authpb.Role {
|
||||
}
|
||||
|
||||
func (atx *authBatchTx) UnsafeGetRole(roleName string) *authpb.Role {
|
||||
_, vs := atx.tx.UnsafeRange(AuthRoles, []byte(roleName), nil, 0)
|
||||
if len(vs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
role := &authpb.Role{}
|
||||
err := role.Unmarshal(vs[0])
|
||||
if err != nil {
|
||||
atx.lg.Panic("failed to unmarshal 'authpb.Role'", zap.Error(err))
|
||||
}
|
||||
return role
|
||||
arx := &authReadTx{tx: atx.tx, lg: atx.lg}
|
||||
return arx.UnsafeGetRole(roleName)
|
||||
}
|
||||
|
||||
func (abe *authBackend) GetAllRoles() []*authpb.Role {
|
||||
@ -53,21 +44,8 @@ func (abe *authBackend) GetAllRoles() []*authpb.Role {
|
||||
}
|
||||
|
||||
func (atx *authBatchTx) UnsafeGetAllRoles() []*authpb.Role {
|
||||
_, vs := atx.tx.UnsafeRange(AuthRoles, []byte{0}, []byte{0xff}, -1)
|
||||
if len(vs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
roles := make([]*authpb.Role, len(vs))
|
||||
for i := range vs {
|
||||
role := &authpb.Role{}
|
||||
err := role.Unmarshal(vs[i])
|
||||
if err != nil {
|
||||
atx.lg.Panic("failed to unmarshal 'authpb.Role'", zap.Error(err))
|
||||
}
|
||||
roles[i] = role
|
||||
}
|
||||
return roles
|
||||
arx := &authReadTx{tx: atx.tx, lg: atx.lg}
|
||||
return arx.UnsafeGetAllRoles()
|
||||
}
|
||||
|
||||
func (atx *authBatchTx) UnsafePutRole(role *authpb.Role) {
|
||||
@ -86,3 +64,35 @@ func (atx *authBatchTx) UnsafePutRole(role *authpb.Role) {
|
||||
func (atx *authBatchTx) UnsafeDeleteRole(rolename string) {
|
||||
atx.tx.UnsafeDelete(AuthRoles, []byte(rolename))
|
||||
}
|
||||
|
||||
func (atx *authReadTx) UnsafeGetRole(roleName string) *authpb.Role {
|
||||
_, vs := atx.tx.UnsafeRange(AuthRoles, []byte(roleName), nil, 0)
|
||||
if len(vs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
role := &authpb.Role{}
|
||||
err := role.Unmarshal(vs[0])
|
||||
if err != nil {
|
||||
atx.lg.Panic("failed to unmarshal 'authpb.Role'", zap.Error(err))
|
||||
}
|
||||
return role
|
||||
}
|
||||
|
||||
func (atx *authReadTx) UnsafeGetAllRoles() []*authpb.Role {
|
||||
_, vs := atx.tx.UnsafeRange(AuthRoles, []byte{0}, []byte{0xff}, -1)
|
||||
if len(vs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
roles := make([]*authpb.Role, len(vs))
|
||||
for i := range vs {
|
||||
role := &authpb.Role{}
|
||||
err := role.Unmarshal(vs[i])
|
||||
if err != nil {
|
||||
atx.lg.Panic("failed to unmarshal 'authpb.Role'", zap.Error(err))
|
||||
}
|
||||
roles[i] = role
|
||||
}
|
||||
return roles
|
||||
}
|
||||
|
@ -27,6 +27,35 @@ func (abe *authBackend) GetUser(username string) *authpb.User {
|
||||
}
|
||||
|
||||
func (atx *authBatchTx) UnsafeGetUser(username string) *authpb.User {
|
||||
arx := &authReadTx{tx: atx.tx, lg: atx.lg}
|
||||
return arx.UnsafeGetUser(username)
|
||||
}
|
||||
|
||||
func (abe *authBackend) GetAllUsers() []*authpb.User {
|
||||
tx := abe.BatchTx()
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
return tx.UnsafeGetAllUsers()
|
||||
}
|
||||
|
||||
func (atx *authBatchTx) UnsafeGetAllUsers() []*authpb.User {
|
||||
arx := &authReadTx{tx: atx.tx, lg: atx.lg}
|
||||
return arx.UnsafeGetAllUsers()
|
||||
}
|
||||
|
||||
func (atx *authBatchTx) UnsafePutUser(user *authpb.User) {
|
||||
b, err := user.Marshal()
|
||||
if err != nil {
|
||||
atx.lg.Panic("failed to unmarshal 'authpb.User'", zap.Error(err))
|
||||
}
|
||||
atx.tx.UnsafePut(AuthUsers, user.Name, b)
|
||||
}
|
||||
|
||||
func (atx *authBatchTx) UnsafeDeleteUser(username string) {
|
||||
atx.tx.UnsafeDelete(AuthUsers, []byte(username))
|
||||
}
|
||||
|
||||
func (atx *authReadTx) UnsafeGetUser(username string) *authpb.User {
|
||||
_, vs := atx.tx.UnsafeRange(AuthUsers, []byte(username), nil, 0)
|
||||
if len(vs) == 0 {
|
||||
return nil
|
||||
@ -44,14 +73,7 @@ func (atx *authBatchTx) UnsafeGetUser(username string) *authpb.User {
|
||||
return user
|
||||
}
|
||||
|
||||
func (abe *authBackend) GetAllUsers() []*authpb.User {
|
||||
tx := abe.BatchTx()
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
return tx.UnsafeGetAllUsers()
|
||||
}
|
||||
|
||||
func (atx *authBatchTx) UnsafeGetAllUsers() []*authpb.User {
|
||||
func (atx *authReadTx) UnsafeGetAllUsers() []*authpb.User {
|
||||
_, vs := atx.tx.UnsafeRange(AuthUsers, []byte{0}, []byte{0xff}, -1)
|
||||
if len(vs) == 0 {
|
||||
return nil
|
||||
@ -68,15 +90,3 @@ func (atx *authBatchTx) UnsafeGetAllUsers() []*authpb.User {
|
||||
}
|
||||
return users
|
||||
}
|
||||
|
||||
func (atx *authBatchTx) UnsafePutUser(user *authpb.User) {
|
||||
b, err := user.Marshal()
|
||||
if err != nil {
|
||||
atx.lg.Panic("failed to unmarshal 'authpb.User'", zap.Error(err))
|
||||
}
|
||||
atx.tx.UnsafePut(AuthUsers, user.Name, b)
|
||||
}
|
||||
|
||||
func (atx *authBatchTx) UnsafeDeleteUser(username string) {
|
||||
atx.tx.UnsafeDelete(AuthUsers, []byte(username))
|
||||
}
|
||||
|
@ -26,7 +26,7 @@ func UnsafeCreateMetaBucket(tx backend.BatchTx) {
|
||||
|
||||
// CreateMetaBucket creates the `meta` bucket (if it does not exists yet).
|
||||
func CreateMetaBucket(tx backend.BatchTx) {
|
||||
tx.Lock()
|
||||
tx.LockOutsideApply()
|
||||
defer tx.Unlock()
|
||||
tx.UnsafeCreateBucket(Meta)
|
||||
}
|
||||
@ -51,8 +51,8 @@ func UnsafeReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) {
|
||||
// ReadConsistentIndex loads consistent index and term from given transaction.
|
||||
// returns 0 if the data are not found.
|
||||
func ReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) {
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
tx.RLock()
|
||||
defer tx.RUnlock()
|
||||
return UnsafeReadConsistentIndex(tx)
|
||||
}
|
||||
|
||||
|
@ -52,7 +52,7 @@ func (s *membershipBackend) MustSaveMemberToBackend(m *membership.Member) {
|
||||
}
|
||||
|
||||
tx := s.be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
defer tx.Unlock()
|
||||
tx.UnsafePut(Members, mkey, mvalue)
|
||||
}
|
||||
@ -61,7 +61,7 @@ func (s *membershipBackend) MustSaveMemberToBackend(m *membership.Member) {
|
||||
// from the v3 backend.
|
||||
func (s *membershipBackend) TrimClusterFromBackend() error {
|
||||
tx := s.be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockOutsideApply()
|
||||
defer tx.Unlock()
|
||||
tx.UnsafeDeleteBucket(Cluster)
|
||||
return nil
|
||||
@ -71,7 +71,7 @@ func (s *membershipBackend) MustDeleteMemberFromBackend(id types.ID) {
|
||||
mkey := BackendMemberKey(id)
|
||||
|
||||
tx := s.be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
defer tx.Unlock()
|
||||
tx.UnsafeDelete(Members, mkey)
|
||||
tx.UnsafePut(MembersRemoved, mkey, []byte("removed"))
|
||||
@ -121,7 +121,7 @@ func (s *membershipBackend) readMembersFromBackend() (map[types.ID]*membership.M
|
||||
func (s *membershipBackend) TrimMembershipFromBackend() error {
|
||||
s.lg.Info("Trimming membership information from the backend...")
|
||||
tx := s.be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockOutsideApply()
|
||||
defer tx.Unlock()
|
||||
err := tx.UnsafeForEach(Members, func(k, v []byte) error {
|
||||
tx.UnsafeDelete(Members, k)
|
||||
@ -146,7 +146,7 @@ func (s *membershipBackend) MustSaveClusterVersionToBackend(ver *semver.Version)
|
||||
ckey := ClusterClusterVersionKeyName
|
||||
|
||||
tx := s.be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
defer tx.Unlock()
|
||||
tx.UnsafePut(Cluster, ckey, []byte(ver.String()))
|
||||
}
|
||||
@ -160,14 +160,14 @@ func (s *membershipBackend) MustSaveDowngradeToBackend(downgrade *version.Downgr
|
||||
s.lg.Panic("failed to marshal downgrade information", zap.Error(err))
|
||||
}
|
||||
tx := s.be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
defer tx.Unlock()
|
||||
tx.UnsafePut(Cluster, dkey, dvalue)
|
||||
}
|
||||
|
||||
func (s *membershipBackend) MustCreateBackendBuckets() {
|
||||
tx := s.be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockOutsideApply()
|
||||
defer tx.Unlock()
|
||||
tx.UnsafeCreateBucket(Members)
|
||||
tx.UnsafeCreateBucket(MembersRemoved)
|
||||
|
@ -49,7 +49,7 @@ func newPlan(lg *zap.Logger, current semver.Version, target semver.Version) (pla
|
||||
}
|
||||
|
||||
func (p migrationPlan) Execute(lg *zap.Logger, tx backend.BatchTx) error {
|
||||
tx.Lock()
|
||||
tx.LockOutsideApply()
|
||||
defer tx.Unlock()
|
||||
return p.unsafeExecute(lg, tx)
|
||||
}
|
||||
@ -90,7 +90,7 @@ func newMigrationStep(v semver.Version, isUpgrade bool, changes []schemaChange)
|
||||
|
||||
// execute runs actions required to migrate etcd storage between two minor versions.
|
||||
func (s migrationStep) execute(lg *zap.Logger, tx backend.BatchTx) error {
|
||||
tx.Lock()
|
||||
tx.LockOutsideApply()
|
||||
defer tx.Unlock()
|
||||
return s.unsafeExecute(lg, tx)
|
||||
}
|
||||
|
@ -30,13 +30,13 @@ var (
|
||||
)
|
||||
|
||||
// Validate checks provided backend to confirm that schema used is supported.
|
||||
func Validate(lg *zap.Logger, tx backend.BatchTx) error {
|
||||
func Validate(lg *zap.Logger, tx backend.ReadTx) error {
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
return unsafeValidate(lg, tx)
|
||||
}
|
||||
|
||||
func unsafeValidate(lg *zap.Logger, tx backend.BatchTx) error {
|
||||
func unsafeValidate(lg *zap.Logger, tx backend.ReadTx) error {
|
||||
current, err := UnsafeDetectSchemaVersion(lg, tx)
|
||||
if err != nil {
|
||||
// v3.5 requires a wal snapshot to persist its fields, so we can assign it a schema version.
|
||||
@ -60,7 +60,7 @@ type WALVersion interface {
|
||||
// Migrate updates storage schema to provided target version.
|
||||
// Downgrading requires that provided WAL doesn't contain unsupported entries.
|
||||
func Migrate(lg *zap.Logger, tx backend.BatchTx, w WALVersion, target semver.Version) error {
|
||||
tx.Lock()
|
||||
tx.LockOutsideApply()
|
||||
defer tx.Unlock()
|
||||
return UnsafeMigrate(lg, tx, w, target)
|
||||
}
|
||||
@ -89,8 +89,8 @@ func UnsafeMigrate(lg *zap.Logger, tx backend.BatchTx, w WALVersion, target semv
|
||||
// * v3.5 will return it's version if it includes all storage fields added in v3.5 (might require a snapshot).
|
||||
// * v3.4 and older is not supported and will return error.
|
||||
func DetectSchemaVersion(lg *zap.Logger, tx backend.ReadTx) (v semver.Version, err error) {
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
tx.RLock()
|
||||
defer tx.RUnlock()
|
||||
return UnsafeDetectSchemaVersion(lg, tx)
|
||||
}
|
||||
|
||||
|
@ -88,7 +88,7 @@ func TestValidate(t *testing.T) {
|
||||
|
||||
b := backend.NewDefaultBackend(lg, dataPath)
|
||||
defer b.Close()
|
||||
err := Validate(lg, b.BatchTx())
|
||||
err := Validate(lg, b.ReadTx())
|
||||
if (err != nil) != tc.expectError {
|
||||
t.Errorf("Validate(lg, tx) = %+v, expected error: %v", err, tc.expectError)
|
||||
}
|
||||
|
@ -108,8 +108,7 @@ func MustVerifyIfEnabled(cfg Config) {
|
||||
}
|
||||
|
||||
func validateConsistentIndex(cfg Config, hardstate *raftpb.HardState, snapshot *walpb.Snapshot, be backend.Backend) error {
|
||||
tx := be.BatchTx()
|
||||
index, term := schema.ReadConsistentIndex(tx)
|
||||
index, term := schema.ReadConsistentIndex(be.ReadTx())
|
||||
if cfg.ExactIndex && index != hardstate.Commit {
|
||||
return fmt.Errorf("backend.ConsistentIndex (%v) expected == WAL.HardState.commit (%v)", index, hardstate.Commit)
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user