rename LockWithoutHook to LockOutsideApply and add LockInsideApply

This commit is contained in:
ahrtr 2022-04-06 05:07:07 +08:00
parent 47038593e9
commit e155e50886
25 changed files with 106 additions and 71 deletions

View File

@ -322,7 +322,7 @@ func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, term uint64, desir
if !v3 {
tx := be.BatchTx()
tx.Lock()
tx.LockOutsideApply()
defer tx.Unlock()
schema.UnsafeCreateMetaBucket(tx)
schema.UnsafeUpdateConsistentIndex(tx, idx, term, false)

View File

@ -140,7 +140,7 @@ func migrateCommandFunc(c *migrateConfig) error {
}
func migrateForce(lg *zap.Logger, tx backend.BatchTx, target *semver.Version) {
tx.LockWithoutHook()
tx.LockOutsideApply()
defer tx.Unlock()
// Storage version is only supported since v3.6
if target.LessThan(schema.V3_6) {

View File

@ -374,7 +374,7 @@ func (as *authStore) CheckPassword(username, password string) (uint64, error) {
func (as *authStore) Recover(be AuthBackend) {
as.be = be
tx := be.BatchTx()
tx := be.ReadTx()
tx.Lock()
enabled := tx.UnsafeReadAuthEnabled()
@ -939,7 +939,7 @@ func NewAuthStore(lg *zap.Logger, be AuthBackend, tp TokenProvider, bcryptCost i
be.CreateAuthBuckets()
tx := be.BatchTx()
// We should call LockWithoutHook here, but the txPostLockHoos isn't set
// We should call LockOutsideApply here, but the txPostLockHoos isn't set
// to EtcdServer yet, so it's OK.
tx.Lock()
enabled := tx.UnsafeReadAuthEnabled()

View File

@ -94,7 +94,7 @@ func (s *serverVersionAdapter) UpdateStorageVersion(target semver.Version) error
defer s.bemu.RUnlock()
tx := s.be.BatchTx()
tx.LockWithoutHook()
tx.LockOutsideApply()
defer tx.Unlock()
return schema.UnsafeMigrate(s.lg, tx, s.r.storage, target)
}

View File

@ -231,7 +231,7 @@ func bootstrapBackend(cfg config.ServerConfig, haveWAL bool, st v2store.Store, s
}
}
if beExist {
err = schema.Validate(cfg.Logger, be.BatchTx())
err = schema.Validate(cfg.Logger, be.ReadTx())
if err != nil {
cfg.Logger.Error("Failed to validate schema", zap.Error(err))
return nil, err

View File

@ -82,8 +82,6 @@ func (ci *consistentIndex) ConsistentIndex() uint64 {
return v
}
// UnsafeConsistentIndex is similar to ConsistentIndex,
// but it shouldn't lock the transaction.
func (ci *consistentIndex) UnsafeConsistentIndex() uint64 {
if index := atomic.LoadUint64(&ci.consistentIndex); index > 0 {
return index
@ -134,7 +132,7 @@ func (f *fakeConsistentIndex) UnsafeSave(_ backend.BatchTx) {}
func (f *fakeConsistentIndex) SetBackend(_ Backend) {}
func UpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, onlyGrow bool) {
tx.LockWithoutHook()
tx.LockOutsideApply()
defer tx.Unlock()
schema.UnsafeUpdateConsistentIndex(tx, index, term, onlyGrow)
}

View File

@ -405,7 +405,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
// Set the hook after EtcdServer finishes the initialization to avoid
// the hook being called during the initialization process.
srv.be.SetTxPostLockHook(srv.getTxPostLockHook())
srv.be.SetTxPostLockInsideApplyHook(srv.getTxPostLockHook())
// TODO: move transport initialization near the definition of remote
tr := &rafthttp.Transport{
@ -984,7 +984,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
}
s.consistIndex.SetBackend(newbe)
newbe.SetTxPostLockHook(s.getTxPostLockHook())
newbe.SetTxPostLockInsideApplyHook(s.getTxPostLockHook())
lg.Info("restored mvcc store", zap.Uint64("consistent-index", s.consistIndex.ConsistentIndex()))

View File

@ -797,7 +797,7 @@ func (le *lessor) findDueScheduledCheckpoints(checkpointLimit int) []*pb.LeaseCh
func (le *lessor) initAndRecover() {
tx := le.b.BatchTx()
tx.LockWithoutHook()
tx.LockOutsideApply()
schema.UnsafeCreateLeaseBucket(tx)
lpbs := schema.MustUnsafeGetAllLeases(tx)
tx.Unlock()
@ -845,7 +845,7 @@ func (l *Lease) expired() bool {
func (l *Lease) persistTo(b backend.Backend) {
lpb := leasepb.Lease{ID: int64(l.ID), TTL: l.ttl, RemainingTTL: l.remainingTTL}
tx := b.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()
schema.MustUnsafePutLease(tx, &lpb)
}

View File

@ -68,8 +68,8 @@ type Backend interface {
ForceCommit()
Close() error
// SetTxPostLockHook sets a txPostLockHook.
SetTxPostLockHook(func())
// SetTxPostLockInsideApplyHook sets a txPostLockHook.
SetTxPostLockInsideApplyHook(func())
}
type Snapshot interface {
@ -233,10 +233,10 @@ func (b *backend) BatchTx() BatchTx {
return b.batchTx
}
func (b *backend) SetTxPostLockHook(hook func()) {
func (b *backend) SetTxPostLockInsideApplyHook(hook func()) {
// It needs to lock the batchTx, because the periodic commit
// may be accessing the txPostLockHook at the moment.
b.batchTx.LockWithoutHook()
b.batchTx.lock()
defer b.batchTx.Unlock()
b.txPostLockHook = hook
}
@ -452,7 +452,7 @@ func (b *backend) defrag() error {
// TODO: make this non-blocking?
// lock batchTx to ensure nobody is using previous tx, and then
// close previous ongoing tx.
b.batchTx.LockWithoutHook()
b.batchTx.LockOutsideApply()
defer b.batchTx.Unlock()
// lock database after lock tx to avoid deadlock.

View File

@ -65,25 +65,31 @@ type batchTx struct {
pending int
}
// Lock is supposed to be called only by the unit test.
func (t *batchTx) Lock() {
t.LockWithoutHook()
if t.backend.txPostLockHook != nil {
t.backend.txPostLockHook()
}
ValidateCalledInsideUnittest(t.backend.lg)
t.lock()
}
func (t *batchTx) LockWithoutHook() {
func (t *batchTx) lock() {
t.Mutex.Lock()
}
func (t *batchTx) LockInsideApply() {
ValidateCalledInsideApply(t.backend.lg)
t.Lock()
t.lock()
if t.backend.txPostLockHook != nil {
// The callers of some methods (i.e., (*RaftCluster).AddMember)
// can be coming from both InsideApply and OutsideApply, but the
// callers from OutsideApply will have a nil txPostLockHook. So we
// should check the txPostLockHook before validating the callstack.
ValidateCalledInsideApply(t.backend.lg)
t.backend.txPostLockHook()
}
}
func (t *batchTx) LockOutsideApply() {
ValidateCalledOutSideApply(t.backend.lg)
t.Lock()
t.lock()
}
func (t *batchTx) Unlock() {
@ -233,14 +239,14 @@ func unsafeForEach(tx *bolt.Tx, bucket Bucket, visitor func(k, v []byte) error)
// Commit commits a previous tx and begins a new writable one.
func (t *batchTx) Commit() {
t.LockWithoutHook()
t.lock()
t.commit(false)
t.Unlock()
}
// CommitAndStop commits the previous tx and does not create a new one.
func (t *batchTx) CommitAndStop() {
t.LockWithoutHook()
t.lock()
t.commit(true)
t.Unlock()
}
@ -310,13 +316,13 @@ func (t *batchTxBuffered) Unlock() {
}
func (t *batchTxBuffered) Commit() {
t.LockWithoutHook()
t.lock()
t.commit(false)
t.Unlock()
}
func (t *batchTxBuffered) CommitAndStop() {
t.LockWithoutHook()
t.lock()
t.commit(true)
t.Unlock()
}

View File

@ -41,8 +41,6 @@ func TestBackendPreCommitHook(t *testing.T) {
// Empty commit.
tx.Commit()
write(tx, []byte("foo"), []byte("bar"))
assert.Equal(t, ">cc", getCommitsKey(t, be), "expected 2 explict commits")
tx.Commit()
assert.Equal(t, ">ccc", getCommitsKey(t, be), "expected 3 explict commits")

View File

@ -46,6 +46,15 @@ func ValidateCalledOutSideApply(lg *zap.Logger) {
}
}
func ValidateCalledInsideUnittest(lg *zap.Logger) {
if !verifyLockEnabled() {
return
}
if !insideUnittest() {
lg.Fatal("Lock called outside of unit test!", zap.Stack("stacktrace"))
}
}
func verifyLockEnabled() bool {
return os.Getenv(ENV_VERIFY) == ENV_VERIFY_ALL_VALUE || os.Getenv(ENV_VERIFY) == ENV_VERIFY_LOCK
}
@ -54,3 +63,8 @@ func insideApply() bool {
stackTraceStr := string(debug.Stack())
return strings.Contains(stackTraceStr, ".applyEntries")
}
func insideUnittest() bool {
stackTraceStr := string(debug.Stack())
return strings.Contains(stackTraceStr, "_test.go") && !strings.Contains(stackTraceStr, "tests/")
}

View File

@ -15,7 +15,6 @@
package backend_test
import (
"fmt"
"os"
"testing"
"time"
@ -26,40 +25,60 @@ import (
func TestLockVerify(t *testing.T) {
tcs := []struct {
insideApply bool
lock func(tx backend.BatchTx)
expectPanic bool
name string
insideApply bool
lock func(tx backend.BatchTx)
txPostLockHook func()
expectPanic bool
}{
{
name: "call lockInsideApply from inside apply",
insideApply: true,
lock: lockInsideApply,
expectPanic: false,
},
{
name: "call lockInsideApply from outside apply (without txPostLockHook)",
insideApply: false,
lock: lockInsideApply,
expectPanic: true,
expectPanic: false,
},
{
name: "call lockInsideApply from outside apply (with txPostLockHook)",
insideApply: false,
lock: lockInsideApply,
txPostLockHook: func() {},
expectPanic: true,
},
{
name: "call lockOutsideApply from outside apply",
insideApply: false,
lock: lockOutsideApply,
expectPanic: false,
},
{
name: "call lockOutsideApply from inside apply",
insideApply: true,
lock: lockOutsideApply,
expectPanic: true,
},
{
name: "call Lock from unit test",
insideApply: false,
lock: lockFromUT,
expectPanic: false,
},
}
env := os.Getenv("ETCD_VERIFY")
os.Setenv("ETCD_VERIFY", "lock")
defer func() {
os.Setenv("ETCD_VERIFY", env)
}()
for i, tc := range tcs {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
be, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
be.SetTxPostLockInsideApplyHook(tc.txPostLockHook)
hasPaniced := handlePanic(func() {
if tc.insideApply {
@ -89,3 +108,4 @@ func applyEntries(be backend.Backend, f func(tx backend.BatchTx)) {
func lockInsideApply(tx backend.BatchTx) { tx.LockInsideApply() }
func lockOutsideApply(tx backend.BatchTx) { tx.LockOutsideApply() }
func lockFromUT(tx backend.BatchTx) { tx.Lock() }

View File

@ -121,7 +121,7 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfi
}
tx := s.b.BatchTx()
tx.LockWithoutHook()
tx.LockOutsideApply()
tx.UnsafeCreateBucket(schema.Key)
schema.UnsafeCreateMetaBucket(tx)
tx.Unlock()
@ -331,7 +331,7 @@ func (s *store) restore() error {
// restore index
tx := s.b.BatchTx()
tx.LockWithoutHook()
tx.LockOutsideApply()
finishedCompact, found := UnsafeReadFinishedCompact(tx)
if found {

View File

@ -42,7 +42,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc
start := time.Now()
tx := s.b.BatchTx()
tx.LockWithoutHook()
tx.LockOutsideApply()
keys, _ := tx.UnsafeRange(schema.Key, last, end, int64(batchNum))
for _, key := range keys {
rev = bytesToRev(key)

View File

@ -881,7 +881,8 @@ type fakeBatchTx struct {
rangeRespc chan rangeResp
}
func (b *fakeBatchTx) LockWithoutHook() {}
func (b *fakeBatchTx) LockInsideApply() {}
func (b *fakeBatchTx) LockOutsideApply() {}
func (b *fakeBatchTx) Lock() {}
func (b *fakeBatchTx) Unlock() {}
func (b *fakeBatchTx) RLock() {}
@ -905,10 +906,8 @@ func (b *fakeBatchTx) UnsafeDelete(bucket backend.Bucket, key []byte) {
func (b *fakeBatchTx) UnsafeForEach(bucket backend.Bucket, visitor func(k, v []byte) error) error {
return nil
}
func (b *fakeBatchTx) Commit() {}
func (b *fakeBatchTx) CommitAndStop() {}
func (b *fakeBatchTx) LockInsideApply() {}
func (b *fakeBatchTx) LockOutsideApply() {}
func (b *fakeBatchTx) Commit() {}
func (b *fakeBatchTx) CommitAndStop() {}
type fakeBackend struct {
tx *fakeBatchTx
@ -925,7 +924,7 @@ func (b *fakeBackend) Snapshot() backend.Snapshot
func (b *fakeBackend) ForceCommit() {}
func (b *fakeBackend) Defrag() error { return nil }
func (b *fakeBackend) Close() error { return nil }
func (b *fakeBackend) SetTxPostLockHook(func()) {}
func (b *fakeBackend) SetTxPostLockInsideApplyHook(func()) {}
type indexGetResp struct {
rev revision

View File

@ -133,7 +133,7 @@ type storeTxnWrite struct {
func (s *store) Write(trace *traceutil.Trace) TxnWrite {
s.mu.RLock()
tx := s.b.BatchTx()
tx.Lock()
tx.LockInsideApply()
tw := &storeTxnWrite{
storeTxnRead: storeTxnRead{s, tx, 0, 0, trace},
tx: tx,

View File

@ -36,7 +36,7 @@ func UnsafeReadScheduledCompact(tx backend.ReadTx) (scheduledComact int64, found
}
func SetScheduledCompact(tx backend.BatchTx, value int64) {
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()
UnsafeSetScheduledCompact(tx, value)
}
@ -48,7 +48,7 @@ func UnsafeSetScheduledCompact(tx backend.BatchTx, value int64) {
}
func SetFinishedCompact(tx backend.BatchTx, value int64) {
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()
UnsafeSetFinishedCompact(tx, value)
}

View File

@ -34,14 +34,14 @@ func NewAlarmBackend(lg *zap.Logger, be backend.Backend) *alarmBackend {
func (s *alarmBackend) CreateAlarmBucket() {
tx := s.be.BatchTx()
tx.LockWithoutHook()
tx.LockOutsideApply()
defer tx.Unlock()
tx.UnsafeCreateBucket(Alarm)
}
func (s *alarmBackend) MustPutAlarm(alarm *etcdserverpb.AlarmMember) {
tx := s.be.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()
s.mustUnsafePutAlarm(tx, alarm)
}
@ -57,7 +57,7 @@ func (s *alarmBackend) mustUnsafePutAlarm(tx backend.BatchTx, alarm *etcdserverp
func (s *alarmBackend) MustDeleteAlarm(alarm *etcdserverpb.AlarmMember) {
tx := s.be.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()
s.mustUnsafeDeleteAlarm(tx, alarm)
}

View File

@ -49,7 +49,7 @@ func NewAuthBackend(lg *zap.Logger, be backend.Backend) *authBackend {
func (abe *authBackend) CreateAuthBuckets() {
tx := abe.be.BatchTx()
tx.LockWithoutHook()
tx.LockOutsideApply()
defer tx.Unlock()
tx.UnsafeCreateBucket(Auth)
tx.UnsafeCreateBucket(AuthUsers)
@ -106,7 +106,7 @@ func (atx *authBatchTx) UnsafeReadAuthRevision() uint64 {
}
func (atx *authBatchTx) Lock() {
atx.tx.Lock()
atx.tx.LockInsideApply()
}
func (atx *authBatchTx) Unlock() {

View File

@ -26,7 +26,7 @@ func UnsafeCreateMetaBucket(tx backend.BatchTx) {
// CreateMetaBucket creates the `meta` bucket (if it does not exists yet).
func CreateMetaBucket(tx backend.BatchTx) {
tx.LockWithoutHook()
tx.LockOutsideApply()
defer tx.Unlock()
tx.UnsafeCreateBucket(Meta)
}

View File

@ -52,7 +52,7 @@ func (s *membershipBackend) MustSaveMemberToBackend(m *membership.Member) {
}
tx := s.be.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()
tx.UnsafePut(Members, mkey, mvalue)
}
@ -61,7 +61,7 @@ func (s *membershipBackend) MustSaveMemberToBackend(m *membership.Member) {
// from the v3 backend.
func (s *membershipBackend) TrimClusterFromBackend() error {
tx := s.be.BatchTx()
tx.LockWithoutHook()
tx.LockOutsideApply()
defer tx.Unlock()
tx.UnsafeDeleteBucket(Cluster)
return nil
@ -71,7 +71,7 @@ func (s *membershipBackend) MustDeleteMemberFromBackend(id types.ID) {
mkey := BackendMemberKey(id)
tx := s.be.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()
tx.UnsafeDelete(Members, mkey)
tx.UnsafePut(MembersRemoved, mkey, []byte("removed"))
@ -121,7 +121,7 @@ func (s *membershipBackend) readMembersFromBackend() (map[types.ID]*membership.M
func (s *membershipBackend) TrimMembershipFromBackend() error {
s.lg.Info("Trimming membership information from the backend...")
tx := s.be.BatchTx()
tx.LockWithoutHook()
tx.LockOutsideApply()
defer tx.Unlock()
err := tx.UnsafeForEach(Members, func(k, v []byte) error {
tx.UnsafeDelete(Members, k)
@ -146,7 +146,7 @@ func (s *membershipBackend) MustSaveClusterVersionToBackend(ver *semver.Version)
ckey := ClusterClusterVersionKeyName
tx := s.be.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()
tx.UnsafePut(Cluster, ckey, []byte(ver.String()))
}
@ -160,14 +160,14 @@ func (s *membershipBackend) MustSaveDowngradeToBackend(downgrade *version.Downgr
s.lg.Panic("failed to marshal downgrade information", zap.Error(err))
}
tx := s.be.BatchTx()
tx.Lock()
tx.LockInsideApply()
defer tx.Unlock()
tx.UnsafePut(Cluster, dkey, dvalue)
}
func (s *membershipBackend) MustCreateBackendBuckets() {
tx := s.be.BatchTx()
tx.LockWithoutHook()
tx.LockOutsideApply()
defer tx.Unlock()
tx.UnsafeCreateBucket(Members)
tx.UnsafeCreateBucket(MembersRemoved)

View File

@ -49,7 +49,7 @@ func newPlan(lg *zap.Logger, current semver.Version, target semver.Version) (pla
}
func (p migrationPlan) Execute(lg *zap.Logger, tx backend.BatchTx) error {
tx.LockWithoutHook()
tx.LockOutsideApply()
defer tx.Unlock()
return p.unsafeExecute(lg, tx)
}
@ -90,7 +90,7 @@ func newMigrationStep(v semver.Version, isUpgrade bool, changes []schemaChange)
// execute runs actions required to migrate etcd storage between two minor versions.
func (s migrationStep) execute(lg *zap.Logger, tx backend.BatchTx) error {
tx.LockWithoutHook()
tx.LockOutsideApply()
defer tx.Unlock()
return s.unsafeExecute(lg, tx)
}

View File

@ -30,13 +30,13 @@ var (
)
// Validate checks provided backend to confirm that schema used is supported.
func Validate(lg *zap.Logger, tx backend.BatchTx) error {
tx.LockWithoutHook()
func Validate(lg *zap.Logger, tx backend.ReadTx) error {
tx.Lock()
defer tx.Unlock()
return unsafeValidate(lg, tx)
}
func unsafeValidate(lg *zap.Logger, tx backend.BatchTx) error {
func unsafeValidate(lg *zap.Logger, tx backend.ReadTx) error {
current, err := UnsafeDetectSchemaVersion(lg, tx)
if err != nil {
// v3.5 requires a wal snapshot to persist its fields, so we can assign it a schema version.
@ -60,7 +60,7 @@ type WALVersion interface {
// Migrate updates storage schema to provided target version.
// Downgrading requires that provided WAL doesn't contain unsupported entries.
func Migrate(lg *zap.Logger, tx backend.BatchTx, w WALVersion, target semver.Version) error {
tx.LockWithoutHook()
tx.LockOutsideApply()
defer tx.Unlock()
return UnsafeMigrate(lg, tx, w, target)
}

View File

@ -88,7 +88,7 @@ func TestValidate(t *testing.T) {
b := backend.NewDefaultBackend(lg, dataPath)
defer b.Close()
err := Validate(lg, b.BatchTx())
err := Validate(lg, b.ReadTx())
if (err != nil) != tc.expectError {
t.Errorf("Validate(lg, tx) = %+v, expected error: %v", err, tc.expectError)
}