mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
fix the data inconsistency issue by adding a txPostLockHook into the backend
Previously the SetConsistentIndex() is called during the apply workflow, but it's outside the db transaction. If a commit happens between SetConsistentIndex and the following apply workflow, and etcd crashes for whatever reason right after the commit, then etcd commits an incomplete transaction to db. Eventually etcd runs into the data inconsistency issue. In this commit, we move the SetConsistentIndex into a txPostLockHook, so it will be executed inside the transaction lock.
This commit is contained in:
parent
3ace622792
commit
66c7aab4d3
@ -319,7 +319,7 @@ func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, term uint64, desir
|
||||
|
||||
if !v3 {
|
||||
tx := be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockOutsideApply()
|
||||
defer tx.Unlock()
|
||||
cindex.UnsafeCreateMetaBucket(tx)
|
||||
cindex.UnsafeUpdateConsistentIndex(tx, idx, term)
|
||||
|
@ -22,7 +22,7 @@ import (
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func getMergedPerms(lg *zap.Logger, tx backend.BatchTx, userName string) *unifiedRangePermissions {
|
||||
func getMergedPerms(lg *zap.Logger, tx backend.ReadTx, userName string) *unifiedRangePermissions {
|
||||
user := getUser(lg, tx, userName)
|
||||
if user == nil {
|
||||
return nil
|
||||
@ -105,7 +105,7 @@ func checkKeyPoint(lg *zap.Logger, cachedPerms *unifiedRangePermissions, key []b
|
||||
return false
|
||||
}
|
||||
|
||||
func (as *authStore) isRangeOpPermitted(tx backend.BatchTx, userName string, key, rangeEnd []byte, permtyp authpb.Permission_Type) bool {
|
||||
func (as *authStore) isRangeOpPermitted(tx backend.ReadTx, userName string, key, rangeEnd []byte, permtyp authpb.Permission_Type) bool {
|
||||
// assumption: tx is Lock()ed
|
||||
_, ok := as.rangePermCache[userName]
|
||||
if !ok {
|
||||
|
@ -223,7 +223,7 @@ func (as *authStore) AuthEnable() error {
|
||||
}
|
||||
b := as.be
|
||||
tx := b.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
defer func() {
|
||||
tx.Unlock()
|
||||
b.ForceCommit()
|
||||
@ -259,7 +259,7 @@ func (as *authStore) AuthDisable() {
|
||||
}
|
||||
b := as.be
|
||||
tx := b.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
tx.UnsafePut(buckets.Auth, enableFlagKey, authDisabled)
|
||||
as.commitRevision(tx)
|
||||
tx.Unlock()
|
||||
@ -287,7 +287,7 @@ func (as *authStore) Authenticate(ctx context.Context, username, password string
|
||||
}
|
||||
|
||||
tx := as.be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
defer tx.Unlock()
|
||||
|
||||
user := getUser(as.lg, tx, username)
|
||||
@ -324,7 +324,7 @@ func (as *authStore) CheckPassword(username, password string) (uint64, error) {
|
||||
// CompareHashAndPassword is very expensive, so we use closures
|
||||
// to avoid putting it in the critical section of the tx lock.
|
||||
revision, err := func() (uint64, error) {
|
||||
tx := as.be.BatchTx()
|
||||
tx := as.be.ReadTx()
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
|
||||
@ -353,7 +353,7 @@ func (as *authStore) CheckPassword(username, password string) (uint64, error) {
|
||||
func (as *authStore) Recover(be backend.Backend) {
|
||||
enabled := false
|
||||
as.be = be
|
||||
tx := be.BatchTx()
|
||||
tx := be.ReadTx()
|
||||
tx.Lock()
|
||||
_, vs := tx.UnsafeRange(buckets.Auth, enableFlagKey, nil, 0)
|
||||
if len(vs) == 1 {
|
||||
@ -385,7 +385,7 @@ func (as *authStore) UserAdd(r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse,
|
||||
}
|
||||
|
||||
tx := as.be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
defer tx.Unlock()
|
||||
|
||||
user := getUser(as.lg, tx, r.Name)
|
||||
@ -431,7 +431,7 @@ func (as *authStore) UserDelete(r *pb.AuthUserDeleteRequest) (*pb.AuthUserDelete
|
||||
}
|
||||
|
||||
tx := as.be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
defer tx.Unlock()
|
||||
|
||||
user := getUser(as.lg, tx, r.Name)
|
||||
@ -456,7 +456,7 @@ func (as *authStore) UserDelete(r *pb.AuthUserDeleteRequest) (*pb.AuthUserDelete
|
||||
|
||||
func (as *authStore) UserChangePassword(r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error) {
|
||||
tx := as.be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
defer tx.Unlock()
|
||||
|
||||
user := getUser(as.lg, tx, r.Name)
|
||||
@ -498,7 +498,7 @@ func (as *authStore) UserChangePassword(r *pb.AuthUserChangePasswordRequest) (*p
|
||||
|
||||
func (as *authStore) UserGrantRole(r *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error) {
|
||||
tx := as.be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
defer tx.Unlock()
|
||||
|
||||
user := getUser(as.lg, tx, r.User)
|
||||
@ -544,7 +544,7 @@ func (as *authStore) UserGrantRole(r *pb.AuthUserGrantRoleRequest) (*pb.AuthUser
|
||||
|
||||
func (as *authStore) UserGet(r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error) {
|
||||
tx := as.be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
user := getUser(as.lg, tx, r.Name)
|
||||
tx.Unlock()
|
||||
|
||||
@ -559,7 +559,7 @@ func (as *authStore) UserGet(r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse,
|
||||
|
||||
func (as *authStore) UserList(r *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error) {
|
||||
tx := as.be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
users := getAllUsers(as.lg, tx)
|
||||
tx.Unlock()
|
||||
|
||||
@ -581,7 +581,7 @@ func (as *authStore) UserRevokeRole(r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUs
|
||||
}
|
||||
|
||||
tx := as.be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
defer tx.Unlock()
|
||||
|
||||
user := getUser(as.lg, tx, r.Name)
|
||||
@ -623,7 +623,7 @@ func (as *authStore) UserRevokeRole(r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUs
|
||||
|
||||
func (as *authStore) RoleGet(r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error) {
|
||||
tx := as.be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
defer tx.Unlock()
|
||||
|
||||
var resp pb.AuthRoleGetResponse
|
||||
@ -638,7 +638,7 @@ func (as *authStore) RoleGet(r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse,
|
||||
|
||||
func (as *authStore) RoleList(r *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error) {
|
||||
tx := as.be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
roles := getAllRoles(as.lg, tx)
|
||||
tx.Unlock()
|
||||
|
||||
@ -651,7 +651,7 @@ func (as *authStore) RoleList(r *pb.AuthRoleListRequest) (*pb.AuthRoleListRespon
|
||||
|
||||
func (as *authStore) RoleRevokePermission(r *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error) {
|
||||
tx := as.be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
defer tx.Unlock()
|
||||
|
||||
role := getRole(as.lg, tx, r.Role)
|
||||
@ -697,7 +697,7 @@ func (as *authStore) RoleDelete(r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDelete
|
||||
}
|
||||
|
||||
tx := as.be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
defer tx.Unlock()
|
||||
|
||||
role := getRole(as.lg, tx, r.Role)
|
||||
@ -742,7 +742,7 @@ func (as *authStore) RoleAdd(r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse,
|
||||
}
|
||||
|
||||
tx := as.be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
defer tx.Unlock()
|
||||
|
||||
role := getRole(as.lg, tx, r.Name)
|
||||
@ -786,7 +786,7 @@ func (as *authStore) RoleGrantPermission(r *pb.AuthRoleGrantPermissionRequest) (
|
||||
}
|
||||
|
||||
tx := as.be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
defer tx.Unlock()
|
||||
|
||||
role := getRole(as.lg, tx, r.Name)
|
||||
@ -849,7 +849,7 @@ func (as *authStore) isOpPermitted(userName string, revision uint64, key, rangeE
|
||||
return ErrAuthOldRevision
|
||||
}
|
||||
|
||||
tx := as.be.BatchTx()
|
||||
tx := as.be.ReadTx()
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
|
||||
@ -891,7 +891,7 @@ func (as *authStore) IsAdminPermitted(authInfo *AuthInfo) error {
|
||||
return ErrUserEmpty
|
||||
}
|
||||
|
||||
tx := as.be.BatchTx()
|
||||
tx := as.be.ReadTx()
|
||||
tx.Lock()
|
||||
u := getUser(as.lg, tx, authInfo.Username)
|
||||
tx.Unlock()
|
||||
@ -907,7 +907,7 @@ func (as *authStore) IsAdminPermitted(authInfo *AuthInfo) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func getUser(lg *zap.Logger, tx backend.BatchTx, username string) *authpb.User {
|
||||
func getUser(lg *zap.Logger, tx backend.ReadTx, username string) *authpb.User {
|
||||
_, vs := tx.UnsafeRange(buckets.AuthUsers, []byte(username), nil, 0)
|
||||
if len(vs) == 0 {
|
||||
return nil
|
||||
@ -925,7 +925,7 @@ func getUser(lg *zap.Logger, tx backend.BatchTx, username string) *authpb.User {
|
||||
return user
|
||||
}
|
||||
|
||||
func getAllUsers(lg *zap.Logger, tx backend.BatchTx) []*authpb.User {
|
||||
func getAllUsers(lg *zap.Logger, tx backend.ReadTx) []*authpb.User {
|
||||
_, vs := tx.UnsafeRange(buckets.AuthUsers, []byte{0}, []byte{0xff}, -1)
|
||||
if len(vs) == 0 {
|
||||
return nil
|
||||
@ -955,7 +955,7 @@ func delUser(tx backend.BatchTx, username string) {
|
||||
tx.UnsafeDelete(buckets.AuthUsers, []byte(username))
|
||||
}
|
||||
|
||||
func getRole(lg *zap.Logger, tx backend.BatchTx, rolename string) *authpb.Role {
|
||||
func getRole(lg *zap.Logger, tx backend.ReadTx, rolename string) *authpb.Role {
|
||||
_, vs := tx.UnsafeRange(buckets.AuthRoles, []byte(rolename), nil, 0)
|
||||
if len(vs) == 0 {
|
||||
return nil
|
||||
@ -969,7 +969,7 @@ func getRole(lg *zap.Logger, tx backend.BatchTx, rolename string) *authpb.Role {
|
||||
return role
|
||||
}
|
||||
|
||||
func getAllRoles(lg *zap.Logger, tx backend.BatchTx) []*authpb.Role {
|
||||
func getAllRoles(lg *zap.Logger, tx backend.ReadTx) []*authpb.Role {
|
||||
_, vs := tx.UnsafeRange(buckets.AuthRoles, []byte{0}, []byte{0xff}, -1)
|
||||
if len(vs) == 0 {
|
||||
return nil
|
||||
@ -1028,7 +1028,7 @@ func NewAuthStore(lg *zap.Logger, be backend.Backend, tp TokenProvider, bcryptCo
|
||||
}
|
||||
|
||||
tx := be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockOutsideApply()
|
||||
|
||||
tx.UnsafeCreateBucket(buckets.Auth)
|
||||
tx.UnsafeCreateBucket(buckets.AuthUsers)
|
||||
@ -1081,7 +1081,7 @@ func (as *authStore) commitRevision(tx backend.BatchTx) {
|
||||
tx.UnsafePut(buckets.Auth, revisionKey, revBytes)
|
||||
}
|
||||
|
||||
func getRevision(tx backend.BatchTx) uint64 {
|
||||
func getRevision(tx backend.ReadTx) uint64 {
|
||||
_, vs := tx.UnsafeRange(buckets.Auth, revisionKey, nil, 0)
|
||||
if len(vs) != 1 {
|
||||
// this can happen in the initialization phase
|
||||
@ -1281,7 +1281,7 @@ func (as *authStore) WithRoot(ctx context.Context) context.Context {
|
||||
|
||||
func (as *authStore) HasRole(user, role string) bool {
|
||||
tx := as.be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
u := getUser(as.lg, tx, user)
|
||||
tx.Unlock()
|
||||
|
||||
|
@ -52,7 +52,7 @@ func unsafeSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) er
|
||||
}
|
||||
|
||||
tx := be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
defer tx.Unlock()
|
||||
if unsafeMemberExists(tx, mkey) {
|
||||
return errMemberAlreadyExist
|
||||
@ -65,7 +65,7 @@ func unsafeSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) er
|
||||
// from the v3 backend.
|
||||
func TrimClusterFromBackend(be backend.Backend) error {
|
||||
tx := be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockOutsideApply()
|
||||
defer tx.Unlock()
|
||||
tx.UnsafeDeleteBucket(buckets.Cluster)
|
||||
return nil
|
||||
@ -75,7 +75,7 @@ func unsafeDeleteMemberFromBackend(be backend.Backend, id types.ID) error {
|
||||
mkey := backendMemberKey(id)
|
||||
|
||||
tx := be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
defer tx.Unlock()
|
||||
tx.UnsafePut(buckets.MembersRemoved, mkey, []byte("removed"))
|
||||
if !unsafeMemberExists(tx, mkey) {
|
||||
@ -140,7 +140,7 @@ func mustReadMembersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.I
|
||||
func TrimMembershipFromBackend(lg *zap.Logger, be backend.Backend) error {
|
||||
lg.Info("Trimming membership information from the backend...")
|
||||
tx := be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockOutsideApply()
|
||||
defer tx.Unlock()
|
||||
err := tx.UnsafeForEach(buckets.Members, func(k, v []byte) error {
|
||||
tx.UnsafeDelete(buckets.Members, k)
|
||||
@ -185,7 +185,7 @@ func mustSaveClusterVersionToBackend(be backend.Backend, ver *semver.Version) {
|
||||
ckey := backendClusterVersionKey()
|
||||
|
||||
tx := be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
defer tx.Unlock()
|
||||
tx.UnsafePut(buckets.Cluster, ckey, []byte(ver.String()))
|
||||
}
|
||||
@ -198,7 +198,7 @@ func mustSaveDowngradeToBackend(lg *zap.Logger, be backend.Backend, downgrade *D
|
||||
lg.Panic("failed to marshal downgrade information", zap.Error(err))
|
||||
}
|
||||
tx := be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
defer tx.Unlock()
|
||||
tx.UnsafePut(buckets.Cluster, dkey, dvalue)
|
||||
}
|
||||
@ -316,7 +316,7 @@ func backendDowngradeKey() []byte {
|
||||
|
||||
func mustCreateBackendBuckets(be backend.Backend) {
|
||||
tx := be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockOutsideApply()
|
||||
defer tx.Unlock()
|
||||
tx.UnsafeCreateBucket(buckets.Members)
|
||||
tx.UnsafeCreateBucket(buckets.MembersRemoved)
|
||||
|
@ -65,7 +65,7 @@ func (a *AlarmStore) Activate(id types.ID, at pb.AlarmType) *pb.AlarmMember {
|
||||
}
|
||||
|
||||
b := a.bg.Backend()
|
||||
b.BatchTx().Lock()
|
||||
b.BatchTx().LockInsideApply()
|
||||
b.BatchTx().UnsafePut(buckets.Alarm, v, nil)
|
||||
b.BatchTx().Unlock()
|
||||
|
||||
@ -94,7 +94,7 @@ func (a *AlarmStore) Deactivate(id types.ID, at pb.AlarmType) *pb.AlarmMember {
|
||||
}
|
||||
|
||||
b := a.bg.Backend()
|
||||
b.BatchTx().Lock()
|
||||
b.BatchTx().LockInsideApply()
|
||||
b.BatchTx().UnsafeDelete(buckets.Alarm, v)
|
||||
b.BatchTx().Unlock()
|
||||
|
||||
@ -122,7 +122,7 @@ func (a *AlarmStore) restore() error {
|
||||
b := a.bg.Backend()
|
||||
tx := b.BatchTx()
|
||||
|
||||
tx.Lock()
|
||||
tx.LockOutsideApply()
|
||||
tx.UnsafeCreateBucket(buckets.Alarm)
|
||||
err := tx.UnsafeForEach(buckets.Alarm, func(k, v []byte) error {
|
||||
var m pb.AlarmMember
|
||||
|
@ -99,7 +99,7 @@ func openBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend {
|
||||
func recoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot, beExist bool, hooks backend.Hooks) (backend.Backend, error) {
|
||||
consistentIndex := uint64(0)
|
||||
if beExist {
|
||||
consistentIndex, _ = cindex.ReadConsistentIndex(oldbe.BatchTx())
|
||||
consistentIndex, _ = cindex.ReadConsistentIndex(oldbe.ReadTx())
|
||||
}
|
||||
if snapshot.Metadata.Index <= consistentIndex {
|
||||
return oldbe, nil
|
||||
|
@ -34,9 +34,18 @@ type ConsistentIndexer interface {
|
||||
// ConsistentIndex returns the consistent index of current executing entry.
|
||||
ConsistentIndex() uint64
|
||||
|
||||
// ConsistentApplyingIndex returns the consistent applying index of current executing entry.
|
||||
ConsistentApplyingIndex() (uint64, uint64)
|
||||
|
||||
// UnsafeConsistentIndex is similar to ConsistentIndex, but it doesn't lock the transaction.
|
||||
UnsafeConsistentIndex() uint64
|
||||
|
||||
// SetConsistentIndex set the consistent index of current executing entry.
|
||||
SetConsistentIndex(v uint64, term uint64)
|
||||
|
||||
// SetConsistentApplyingIndex set the consistent applying index of current executing entry.
|
||||
SetConsistentApplyingIndex(v uint64, term uint64)
|
||||
|
||||
// UnsafeSave must be called holding the lock on the tx.
|
||||
// It saves consistentIndex to the underlying stable storage.
|
||||
UnsafeSave(tx backend.BatchTx)
|
||||
@ -56,6 +65,12 @@ type consistentIndex struct {
|
||||
// The value is being persisted in the backend since v3.5.
|
||||
term uint64
|
||||
|
||||
// applyingIndex and applyingTerm are just temporary cache of the raftpb.Entry.Index
|
||||
// and raftpb.Entry.Term, and they are not ready to be persisted yet. They will be
|
||||
// saved to consistentIndex and term above in the txPostLockInsideApplyHook.
|
||||
applyingIndex uint64
|
||||
applyingTerm uint64
|
||||
|
||||
// be is used for initial read consistentIndex
|
||||
be Backend
|
||||
// mutex is protecting be.
|
||||
@ -75,7 +90,17 @@ func (ci *consistentIndex) ConsistentIndex() uint64 {
|
||||
ci.mutex.Lock()
|
||||
defer ci.mutex.Unlock()
|
||||
|
||||
v, term := ReadConsistentIndex(ci.be.BatchTx())
|
||||
v, term := ReadConsistentIndex(ci.be.ReadTx())
|
||||
ci.SetConsistentIndex(v, term)
|
||||
return v
|
||||
}
|
||||
|
||||
func (ci *consistentIndex) UnsafeConsistentIndex() uint64 {
|
||||
if index := atomic.LoadUint64(&ci.consistentIndex); index > 0 {
|
||||
return index
|
||||
}
|
||||
|
||||
v, term := unsafeReadConsistentIndex(ci.be.ReadTx())
|
||||
ci.SetConsistentIndex(v, term)
|
||||
return v
|
||||
}
|
||||
@ -99,6 +124,15 @@ func (ci *consistentIndex) SetBackend(be Backend) {
|
||||
ci.SetConsistentIndex(0, 0)
|
||||
}
|
||||
|
||||
func (ci *consistentIndex) ConsistentApplyingIndex() (uint64, uint64) {
|
||||
return atomic.LoadUint64(&ci.applyingIndex), atomic.LoadUint64(&ci.applyingTerm)
|
||||
}
|
||||
|
||||
func (ci *consistentIndex) SetConsistentApplyingIndex(v uint64, term uint64) {
|
||||
atomic.StoreUint64(&ci.applyingIndex, v)
|
||||
atomic.StoreUint64(&ci.applyingTerm, term)
|
||||
}
|
||||
|
||||
func NewFakeConsistentIndex(index uint64) ConsistentIndexer {
|
||||
return &fakeConsistentIndex{index: index}
|
||||
}
|
||||
@ -108,12 +142,24 @@ type fakeConsistentIndex struct {
|
||||
term uint64
|
||||
}
|
||||
|
||||
func (f *fakeConsistentIndex) ConsistentIndex() uint64 { return f.index }
|
||||
func (f *fakeConsistentIndex) ConsistentIndex() uint64 {
|
||||
return atomic.LoadUint64(&f.index)
|
||||
}
|
||||
func (f *fakeConsistentIndex) ConsistentApplyingIndex() (uint64, uint64) {
|
||||
return atomic.LoadUint64(&f.index), atomic.LoadUint64(&f.term)
|
||||
}
|
||||
func (f *fakeConsistentIndex) UnsafeConsistentIndex() uint64 {
|
||||
return atomic.LoadUint64(&f.index)
|
||||
}
|
||||
|
||||
func (f *fakeConsistentIndex) SetConsistentIndex(index uint64, term uint64) {
|
||||
atomic.StoreUint64(&f.index, index)
|
||||
atomic.StoreUint64(&f.term, term)
|
||||
}
|
||||
func (f *fakeConsistentIndex) SetConsistentApplyingIndex(index uint64, term uint64) {
|
||||
atomic.StoreUint64(&f.index, index)
|
||||
atomic.StoreUint64(&f.term, term)
|
||||
}
|
||||
|
||||
func (f *fakeConsistentIndex) UnsafeSave(_ backend.BatchTx) {}
|
||||
func (f *fakeConsistentIndex) SetBackend(_ Backend) {}
|
||||
@ -125,7 +171,7 @@ func UnsafeCreateMetaBucket(tx backend.BatchTx) {
|
||||
|
||||
// CreateMetaBucket creates the `meta` bucket (if it does not exists yet).
|
||||
func CreateMetaBucket(tx backend.BatchTx) {
|
||||
tx.Lock()
|
||||
tx.LockOutsideApply()
|
||||
defer tx.Unlock()
|
||||
tx.UnsafeCreateBucket(buckets.Meta)
|
||||
}
|
||||
@ -174,7 +220,7 @@ func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64)
|
||||
}
|
||||
|
||||
func UpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64) {
|
||||
tx.Lock()
|
||||
tx.LockOutsideApply()
|
||||
defer tx.Unlock()
|
||||
UnsafeUpdateConsistentIndex(tx, index, term)
|
||||
}
|
||||
|
@ -661,6 +661,10 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
|
||||
})
|
||||
}
|
||||
|
||||
// Set the hook after EtcdServer finishes the initialization to avoid
|
||||
// the hook being called during the initialization process.
|
||||
srv.be.SetTxPostLockInsideApplyHook(srv.getTxPostLockInsideApplyHook())
|
||||
|
||||
// TODO: move transport initialization near the definition of remote
|
||||
tr := &rafthttp.Transport{
|
||||
Logger: cfg.Logger,
|
||||
@ -1260,6 +1264,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
|
||||
}
|
||||
|
||||
s.consistIndex.SetBackend(newbe)
|
||||
newbe.SetTxPostLockInsideApplyHook(s.getTxPostLockInsideApplyHook())
|
||||
lg.Info("restored mvcc store", zap.Uint64("consistent-index", s.consistIndex.ConsistentIndex()))
|
||||
|
||||
// Closing old backend might block until all the txns
|
||||
@ -2128,7 +2133,7 @@ func (s *EtcdServer) apply(
|
||||
|
||||
// set the consistent index of current executing entry
|
||||
if e.Index > s.consistIndex.ConsistentIndex() {
|
||||
s.consistIndex.SetConsistentIndex(e.Index, e.Term)
|
||||
s.consistIndex.SetConsistentApplyingIndex(e.Index, e.Term)
|
||||
shouldApplyV3 = membership.ApplyBoth
|
||||
}
|
||||
|
||||
@ -2155,10 +2160,18 @@ func (s *EtcdServer) apply(
|
||||
// applyEntryNormal apples an EntryNormal type raftpb request to the EtcdServer
|
||||
func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
|
||||
shouldApplyV3 := membership.ApplyV2storeOnly
|
||||
applyV3Performed := false
|
||||
defer func() {
|
||||
// The txPostLock callback will not get called in this case,
|
||||
// so we should set the consistent index directly.
|
||||
if s.consistIndex != nil && !applyV3Performed && membership.ApplyBoth == shouldApplyV3 {
|
||||
s.consistIndex.SetConsistentIndex(e.Index, e.Term)
|
||||
}
|
||||
}()
|
||||
index := s.consistIndex.ConsistentIndex()
|
||||
if e.Index > index {
|
||||
// set the consistent index of current executing entry
|
||||
s.consistIndex.SetConsistentIndex(e.Index, e.Term)
|
||||
s.consistIndex.SetConsistentApplyingIndex(e.Index, e.Term)
|
||||
shouldApplyV3 = membership.ApplyBoth
|
||||
}
|
||||
s.lg.Debug("apply entry normal",
|
||||
@ -2207,6 +2220,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
|
||||
if !needResult && raftReq.Txn != nil {
|
||||
removeNeedlessRangeReqs(raftReq.Txn)
|
||||
}
|
||||
applyV3Performed = true
|
||||
ar = s.applyV3.Apply(&raftReq, shouldApplyV3)
|
||||
}
|
||||
|
||||
@ -2258,6 +2272,13 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
|
||||
if err := s.cluster.ValidateConfigurationChange(cc); err != nil {
|
||||
cc.NodeID = raft.None
|
||||
s.r.ApplyConfChange(cc)
|
||||
|
||||
// The txPostLock callback will not get called in this case,
|
||||
// so we should set the consistent index directly.
|
||||
if s.consistIndex != nil && membership.ApplyBoth == shouldApplyV3 {
|
||||
applyingIndex, applyingTerm := s.consistIndex.ConsistentApplyingIndex()
|
||||
s.consistIndex.SetConsistentIndex(applyingIndex, applyingTerm)
|
||||
}
|
||||
return false, err
|
||||
}
|
||||
|
||||
@ -2683,6 +2704,15 @@ func (s *EtcdServer) raftStatus() raft.Status {
|
||||
return s.r.Node.Status()
|
||||
}
|
||||
|
||||
func (s *EtcdServer) getTxPostLockInsideApplyHook() func() {
|
||||
return func() {
|
||||
applyingIdx, applyingTerm := s.consistIndex.ConsistentApplyingIndex()
|
||||
if applyingIdx > s.consistIndex.UnsafeConsistentIndex() {
|
||||
s.consistIndex.SetConsistentIndex(applyingIdx, applyingTerm)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func maybeDefragBackend(cfg config.ServerConfig, be backend.Backend) error {
|
||||
size := be.Size()
|
||||
sizeInUse := be.SizeInUse()
|
||||
|
@ -686,9 +686,7 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {
|
||||
|
||||
_, appliedi, _ := srv.apply(ents, &raftpb.ConfState{})
|
||||
consistIndex := srv.consistIndex.ConsistentIndex()
|
||||
if consistIndex != appliedi {
|
||||
t.Fatalf("consistIndex = %v, want %v", consistIndex, appliedi)
|
||||
}
|
||||
assert.Equal(t, uint64(2), appliedi)
|
||||
|
||||
t.Run("verify-backend", func(t *testing.T) {
|
||||
tx := be.BatchTx()
|
||||
@ -697,9 +695,8 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {
|
||||
srv.beHooks.OnPreCommitUnsafe(tx)
|
||||
assert.Equal(t, raftpb.ConfState{Voters: []uint64{2}}, *membership.UnsafeConfStateFromBackend(lg, tx))
|
||||
})
|
||||
rindex, rterm := cindex.ReadConsistentIndex(be.BatchTx())
|
||||
rindex, _ := cindex.ReadConsistentIndex(be.ReadTx())
|
||||
assert.Equal(t, consistIndex, rindex)
|
||||
assert.Equal(t, uint64(4), rterm)
|
||||
}
|
||||
|
||||
func realisticRaftNode(lg *zap.Logger) *raftNode {
|
||||
|
@ -797,7 +797,7 @@ func (le *lessor) findDueScheduledCheckpoints(checkpointLimit int) []*pb.LeaseCh
|
||||
|
||||
func (le *lessor) initAndRecover() {
|
||||
tx := le.b.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockOutsideApply()
|
||||
|
||||
tx.UnsafeCreateBucket(buckets.Lease)
|
||||
lpbs := unsafeGetAllLeases(tx)
|
||||
@ -852,7 +852,7 @@ func (l *Lease) persistTo(b backend.Backend) {
|
||||
panic("failed to marshal lease proto item")
|
||||
}
|
||||
|
||||
b.BatchTx().Lock()
|
||||
b.BatchTx().LockInsideApply()
|
||||
b.BatchTx().UnsafePut(buckets.Lease, key, val)
|
||||
b.BatchTx().Unlock()
|
||||
}
|
||||
|
@ -68,6 +68,9 @@ type Backend interface {
|
||||
Defrag() error
|
||||
ForceCommit()
|
||||
Close() error
|
||||
|
||||
// SetTxPostLockInsideApplyHook sets a txPostLockInsideApplyHook.
|
||||
SetTxPostLockInsideApplyHook(func())
|
||||
}
|
||||
|
||||
type Snapshot interface {
|
||||
@ -120,6 +123,9 @@ type backend struct {
|
||||
|
||||
hooks Hooks
|
||||
|
||||
// txPostLockInsideApplyHook is called each time right after locking the tx.
|
||||
txPostLockInsideApplyHook func()
|
||||
|
||||
lg *zap.Logger
|
||||
}
|
||||
|
||||
@ -231,6 +237,14 @@ func (b *backend) BatchTx() BatchTx {
|
||||
return b.batchTx
|
||||
}
|
||||
|
||||
func (b *backend) SetTxPostLockInsideApplyHook(hook func()) {
|
||||
// It needs to lock the batchTx, because the periodic commit
|
||||
// may be accessing the txPostLockInsideApplyHook at the moment.
|
||||
b.batchTx.lock()
|
||||
defer b.batchTx.Unlock()
|
||||
b.txPostLockInsideApplyHook = hook
|
||||
}
|
||||
|
||||
func (b *backend) ReadTx() ReadTx { return b.readTx }
|
||||
|
||||
// ConcurrentReadTx creates and returns a new ReadTx, which:
|
||||
@ -440,7 +454,7 @@ func (b *backend) defrag() error {
|
||||
// TODO: make this non-blocking?
|
||||
// lock batchTx to ensure nobody is using previous tx, and then
|
||||
// close previous ongoing tx.
|
||||
b.batchTx.Lock()
|
||||
b.batchTx.LockOutsideApply()
|
||||
defer b.batchTx.Unlock()
|
||||
|
||||
// lock database after lock tx to avoid deadlock.
|
||||
|
@ -65,18 +65,32 @@ type batchTx struct {
|
||||
pending int
|
||||
}
|
||||
|
||||
// Lock is supposed to be called only by the unit test.
|
||||
func (t *batchTx) Lock() {
|
||||
ValidateCalledInsideUnittest(t.backend.lg)
|
||||
t.lock()
|
||||
}
|
||||
|
||||
func (t *batchTx) lock() {
|
||||
t.Mutex.Lock()
|
||||
}
|
||||
|
||||
func (t *batchTx) LockInsideApply() {
|
||||
ValidateCalledInsideApply(t.backend.lg)
|
||||
t.Lock()
|
||||
t.lock()
|
||||
if t.backend.txPostLockInsideApplyHook != nil {
|
||||
// The callers of some methods (i.e., (*RaftCluster).AddMember)
|
||||
// can be coming from both InsideApply and OutsideApply, but the
|
||||
// callers from OutsideApply will have a nil txPostLockInsideApplyHook.
|
||||
// So we should check the txPostLockInsideApplyHook before validating
|
||||
// the callstack.
|
||||
ValidateCalledInsideApply(t.backend.lg)
|
||||
t.backend.txPostLockInsideApplyHook()
|
||||
}
|
||||
}
|
||||
|
||||
func (t *batchTx) LockOutsideApply() {
|
||||
ValidateCalledOutSideApply(t.backend.lg)
|
||||
t.Lock()
|
||||
t.lock()
|
||||
}
|
||||
|
||||
func (t *batchTx) Unlock() {
|
||||
@ -226,14 +240,14 @@ func unsafeForEach(tx *bolt.Tx, bucket Bucket, visitor func(k, v []byte) error)
|
||||
|
||||
// Commit commits a previous tx and begins a new writable one.
|
||||
func (t *batchTx) Commit() {
|
||||
t.Lock()
|
||||
t.lock()
|
||||
t.commit(false)
|
||||
t.Unlock()
|
||||
}
|
||||
|
||||
// CommitAndStop commits the previous tx and does not create a new one.
|
||||
func (t *batchTx) CommitAndStop() {
|
||||
t.Lock()
|
||||
t.lock()
|
||||
t.commit(true)
|
||||
t.Unlock()
|
||||
}
|
||||
@ -303,13 +317,13 @@ func (t *batchTxBuffered) Unlock() {
|
||||
}
|
||||
|
||||
func (t *batchTxBuffered) Commit() {
|
||||
t.Lock()
|
||||
t.lock()
|
||||
t.commit(false)
|
||||
t.Unlock()
|
||||
}
|
||||
|
||||
func (t *batchTxBuffered) CommitAndStop() {
|
||||
t.Lock()
|
||||
t.lock()
|
||||
t.commit(true)
|
||||
t.Unlock()
|
||||
}
|
||||
|
@ -40,8 +40,6 @@ func TestBackendPreCommitHook(t *testing.T) {
|
||||
// Empty commit.
|
||||
tx.Commit()
|
||||
|
||||
write(tx, []byte("foo"), []byte("bar"))
|
||||
|
||||
assert.Equal(t, ">cc", getCommitsKey(t, be), "expected 2 explict commits")
|
||||
tx.Commit()
|
||||
assert.Equal(t, ">ccc", getCommitsKey(t, be), "expected 3 explict commits")
|
||||
|
@ -46,6 +46,15 @@ func ValidateCalledOutSideApply(lg *zap.Logger) {
|
||||
}
|
||||
}
|
||||
|
||||
func ValidateCalledInsideUnittest(lg *zap.Logger) {
|
||||
if !verifyLockEnabled() {
|
||||
return
|
||||
}
|
||||
if !insideUnittest() {
|
||||
lg.Fatal("Lock called outside of unit test!", zap.Stack("stacktrace"))
|
||||
}
|
||||
}
|
||||
|
||||
func verifyLockEnabled() bool {
|
||||
return os.Getenv(ENV_VERIFY) == ENV_VERIFY_ALL_VALUE || os.Getenv(ENV_VERIFY) == ENV_VERIFY_LOCK
|
||||
}
|
||||
@ -54,3 +63,8 @@ func insideApply() bool {
|
||||
stackTraceStr := string(debug.Stack())
|
||||
return strings.Contains(stackTraceStr, ".applyEntries")
|
||||
}
|
||||
|
||||
func insideUnittest() bool {
|
||||
stackTraceStr := string(debug.Stack())
|
||||
return strings.Contains(stackTraceStr, "_test.go") && !strings.Contains(stackTraceStr, "tests/")
|
||||
}
|
||||
|
@ -15,7 +15,6 @@
|
||||
package backend_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
@ -26,40 +25,60 @@ import (
|
||||
|
||||
func TestLockVerify(t *testing.T) {
|
||||
tcs := []struct {
|
||||
insideApply bool
|
||||
lock func(tx backend.BatchTx)
|
||||
expectPanic bool
|
||||
name string
|
||||
insideApply bool
|
||||
lock func(tx backend.BatchTx)
|
||||
txPostLockInsideApplyHook func()
|
||||
expectPanic bool
|
||||
}{
|
||||
{
|
||||
name: "call lockInsideApply from inside apply",
|
||||
insideApply: true,
|
||||
lock: lockInsideApply,
|
||||
expectPanic: false,
|
||||
},
|
||||
{
|
||||
name: "call lockInsideApply from outside apply (without txPostLockInsideApplyHook)",
|
||||
insideApply: false,
|
||||
lock: lockInsideApply,
|
||||
expectPanic: true,
|
||||
expectPanic: false,
|
||||
},
|
||||
{
|
||||
name: "call lockInsideApply from outside apply (with txPostLockInsideApplyHook)",
|
||||
insideApply: false,
|
||||
lock: lockInsideApply,
|
||||
txPostLockInsideApplyHook: func() {},
|
||||
expectPanic: true,
|
||||
},
|
||||
{
|
||||
name: "call lockOutsideApply from outside apply",
|
||||
insideApply: false,
|
||||
lock: lockOutsideApply,
|
||||
expectPanic: false,
|
||||
},
|
||||
{
|
||||
name: "call lockOutsideApply from inside apply",
|
||||
insideApply: true,
|
||||
lock: lockOutsideApply,
|
||||
expectPanic: true,
|
||||
},
|
||||
{
|
||||
name: "call Lock from unit test",
|
||||
insideApply: false,
|
||||
lock: lockFromUT,
|
||||
expectPanic: false,
|
||||
},
|
||||
}
|
||||
env := os.Getenv("ETCD_VERIFY")
|
||||
os.Setenv("ETCD_VERIFY", "lock")
|
||||
defer func() {
|
||||
os.Setenv("ETCD_VERIFY", env)
|
||||
}()
|
||||
for i, tc := range tcs {
|
||||
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
|
||||
for _, tc := range tcs {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
|
||||
be, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
|
||||
be.SetTxPostLockInsideApplyHook(tc.txPostLockInsideApplyHook)
|
||||
|
||||
hasPaniced := handlePanic(func() {
|
||||
if tc.insideApply {
|
||||
@ -89,3 +108,4 @@ func applyEntries(be backend.Backend, f func(tx backend.BatchTx)) {
|
||||
|
||||
func lockInsideApply(tx backend.BatchTx) { tx.LockInsideApply() }
|
||||
func lockOutsideApply(tx backend.BatchTx) { tx.LockOutsideApply() }
|
||||
func lockFromUT(tx backend.BatchTx) { tx.Lock() }
|
||||
|
@ -119,7 +119,7 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfi
|
||||
}
|
||||
|
||||
tx := s.b.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockOutsideApply()
|
||||
tx.UnsafeCreateBucket(buckets.Key)
|
||||
tx.UnsafeCreateBucket(buckets.Meta)
|
||||
tx.Unlock()
|
||||
@ -238,7 +238,7 @@ func (s *store) updateCompactRev(rev int64) (<-chan struct{}, error) {
|
||||
revToBytes(revision{main: rev}, rbytes)
|
||||
|
||||
tx := s.b.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
tx.UnsafePut(buckets.Meta, scheduledCompactKeyName, rbytes)
|
||||
tx.Unlock()
|
||||
// ensure that desired compaction is persisted
|
||||
@ -334,7 +334,7 @@ func (s *store) restore() error {
|
||||
keyToLease := make(map[string]lease.LeaseID)
|
||||
|
||||
// restore index
|
||||
tx := s.b.BatchTx()
|
||||
tx := s.b.ReadTx()
|
||||
tx.Lock()
|
||||
|
||||
_, finishedCompactBytes := tx.UnsafeRange(buckets.Meta, finishedCompactKeyName, nil, 0)
|
||||
|
@ -39,7 +39,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc
|
||||
start := time.Now()
|
||||
|
||||
tx := s.b.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockOutsideApply()
|
||||
keys, _ := tx.UnsafeRange(buckets.Key, last, end, int64(s.cfg.CompactionBatchLimit))
|
||||
for _, key := range keys {
|
||||
rev = bytesToRev(key)
|
||||
|
@ -871,6 +871,8 @@ type fakeBatchTx struct {
|
||||
rangeRespc chan rangeResp
|
||||
}
|
||||
|
||||
func (b *fakeBatchTx) LockInsideApply() {}
|
||||
func (b *fakeBatchTx) LockOutsideApply() {}
|
||||
func (b *fakeBatchTx) Lock() {}
|
||||
func (b *fakeBatchTx) Unlock() {}
|
||||
func (b *fakeBatchTx) RLock() {}
|
||||
@ -894,10 +896,8 @@ func (b *fakeBatchTx) UnsafeDelete(bucket backend.Bucket, key []byte) {
|
||||
func (b *fakeBatchTx) UnsafeForEach(bucket backend.Bucket, visitor func(k, v []byte) error) error {
|
||||
return nil
|
||||
}
|
||||
func (b *fakeBatchTx) Commit() {}
|
||||
func (b *fakeBatchTx) CommitAndStop() {}
|
||||
func (b *fakeBatchTx) LockInsideApply() {}
|
||||
func (b *fakeBatchTx) LockOutsideApply() {}
|
||||
func (b *fakeBatchTx) Commit() {}
|
||||
func (b *fakeBatchTx) CommitAndStop() {}
|
||||
|
||||
type fakeBackend struct {
|
||||
tx *fakeBatchTx
|
||||
@ -914,6 +914,7 @@ func (b *fakeBackend) Snapshot() backend.Snapshot
|
||||
func (b *fakeBackend) ForceCommit() {}
|
||||
func (b *fakeBackend) Defrag() error { return nil }
|
||||
func (b *fakeBackend) Close() error { return nil }
|
||||
func (b *fakeBackend) SetTxPostLockInsideApplyHook(func()) {}
|
||||
|
||||
type indexGetResp struct {
|
||||
rev revision
|
||||
|
@ -78,7 +78,7 @@ type storeTxnWrite struct {
|
||||
func (s *store) Write(trace *traceutil.Trace) TxnWrite {
|
||||
s.mu.RLock()
|
||||
tx := s.b.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockInsideApply()
|
||||
tw := &storeTxnWrite{
|
||||
storeTxnRead: storeTxnRead{s, tx, 0, 0, trace},
|
||||
tx: tx,
|
||||
|
@ -31,7 +31,7 @@ func WriteKV(be backend.Backend, kv mvccpb.KeyValue) {
|
||||
panic(fmt.Errorf("cannot marshal event: %v", err))
|
||||
}
|
||||
|
||||
be.BatchTx().Lock()
|
||||
be.BatchTx().LockOutsideApply()
|
||||
be.BatchTx().UnsafePut(buckets.Key, ibytes, d)
|
||||
be.BatchTx().Unlock()
|
||||
}
|
||||
|
@ -108,8 +108,7 @@ func MustVerifyIfEnabled(cfg Config) {
|
||||
}
|
||||
|
||||
func validateConsistentIndex(cfg Config, hardstate *raftpb.HardState, snapshot *walpb.Snapshot, be backend.Backend) error {
|
||||
tx := be.BatchTx()
|
||||
index, term := cindex.ReadConsistentIndex(tx)
|
||||
index, term := cindex.ReadConsistentIndex(be.ReadTx())
|
||||
if cfg.ExactIndex && index != hardstate.Commit {
|
||||
return fmt.Errorf("backend.ConsistentIndex (%v) expected == WAL.HardState.commit (%v)", index, hardstate.Commit)
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user