mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
add a txPostLockHook into the backend
Previously the SetConsistentIndex() is called during the apply workflow, but it's outside the db transaction. If a commit happens between SetConsistentIndex and the following apply workflow, and etcd crashes for whatever reason right after the commit, then etcd commits an incomplete transaction to db. Eventually etcd runs into the data inconsistency issue. In this commit, we move the SetConsistentIndex into a txPostLockHook, so it will be executed inside the transaction lock.
This commit is contained in:
parent
c4d055fe7b
commit
bfd5170f66
@ -118,7 +118,7 @@ type migrateConfig struct {
|
||||
func migrateCommandFunc(c *migrateConfig) error {
|
||||
defer c.be.Close()
|
||||
tx := c.be.BatchTx()
|
||||
current, err := schema.DetectSchemaVersion(c.lg, tx)
|
||||
current, err := schema.DetectSchemaVersion(c.lg, c.be.ReadTx())
|
||||
if err != nil {
|
||||
c.lg.Error("failed to detect storage version. Please make sure you are using data dir from etcd v3.5 and older")
|
||||
return err
|
||||
@ -140,7 +140,7 @@ func migrateCommandFunc(c *migrateConfig) error {
|
||||
}
|
||||
|
||||
func migrateForce(lg *zap.Logger, tx backend.BatchTx, target *semver.Version) {
|
||||
tx.Lock()
|
||||
tx.LockWithoutHook()
|
||||
defer tx.Unlock()
|
||||
// Storage version is only supported since v3.6
|
||||
if target.LessThan(schema.V3_6) {
|
||||
|
@ -78,9 +78,9 @@ func (s *serverVersionAdapter) GetStorageVersion() *semver.Version {
|
||||
s.bemu.RLock()
|
||||
defer s.bemu.RUnlock()
|
||||
|
||||
tx := s.be.BatchTx()
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
tx := s.be.ReadTx()
|
||||
tx.RLock()
|
||||
defer tx.RUnlock()
|
||||
v, err := schema.UnsafeDetectSchemaVersion(s.lg, tx)
|
||||
if err != nil {
|
||||
return nil
|
||||
@ -94,7 +94,7 @@ func (s *serverVersionAdapter) UpdateStorageVersion(target semver.Version) error
|
||||
defer s.bemu.RUnlock()
|
||||
|
||||
tx := s.be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockWithoutHook()
|
||||
defer tx.Unlock()
|
||||
return schema.UnsafeMigrate(s.lg, tx, s.r.storage, target)
|
||||
}
|
||||
|
@ -23,6 +23,7 @@ import (
|
||||
)
|
||||
|
||||
type Backend interface {
|
||||
ReadTx() backend.ReadTx
|
||||
BatchTx() backend.BatchTx
|
||||
}
|
||||
|
||||
@ -32,6 +33,9 @@ type ConsistentIndexer interface {
|
||||
// ConsistentIndex returns the consistent index of current executing entry.
|
||||
ConsistentIndex() uint64
|
||||
|
||||
// UnsafeConsistentIndex is similar to ConsistentIndex, but it doesn't lock the transaction.
|
||||
UnsafeConsistentIndex() uint64
|
||||
|
||||
// SetConsistentIndex set the consistent index of current executing entry.
|
||||
SetConsistentIndex(v uint64, term uint64)
|
||||
|
||||
@ -73,7 +77,19 @@ func (ci *consistentIndex) ConsistentIndex() uint64 {
|
||||
ci.mutex.Lock()
|
||||
defer ci.mutex.Unlock()
|
||||
|
||||
v, term := schema.ReadConsistentIndex(ci.be.BatchTx())
|
||||
v, term := schema.ReadConsistentIndex(ci.be.ReadTx())
|
||||
ci.SetConsistentIndex(v, term)
|
||||
return v
|
||||
}
|
||||
|
||||
// UnsafeConsistentIndex is similar to ConsistentIndex,
|
||||
// but it shouldn't lock the transaction.
|
||||
func (ci *consistentIndex) UnsafeConsistentIndex() uint64 {
|
||||
if index := atomic.LoadUint64(&ci.consistentIndex); index > 0 {
|
||||
return index
|
||||
}
|
||||
|
||||
v, term := schema.UnsafeReadConsistentIndex(ci.be.BatchTx())
|
||||
ci.SetConsistentIndex(v, term)
|
||||
return v
|
||||
}
|
||||
@ -106,7 +122,8 @@ type fakeConsistentIndex struct {
|
||||
term uint64
|
||||
}
|
||||
|
||||
func (f *fakeConsistentIndex) ConsistentIndex() uint64 { return f.index }
|
||||
func (f *fakeConsistentIndex) ConsistentIndex() uint64 { return f.index }
|
||||
func (f *fakeConsistentIndex) UnsafeConsistentIndex() uint64 { return f.index }
|
||||
|
||||
func (f *fakeConsistentIndex) SetConsistentIndex(index uint64, term uint64) {
|
||||
atomic.StoreUint64(&f.index, index)
|
||||
@ -117,7 +134,7 @@ func (f *fakeConsistentIndex) UnsafeSave(_ backend.BatchTx) {}
|
||||
func (f *fakeConsistentIndex) SetBackend(_ Backend) {}
|
||||
|
||||
func UpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, onlyGrow bool) {
|
||||
tx.Lock()
|
||||
tx.LockWithoutHook()
|
||||
defer tx.Unlock()
|
||||
schema.UnsafeUpdateConsistentIndex(tx, index, term, onlyGrow)
|
||||
}
|
||||
|
@ -207,8 +207,10 @@ type EtcdServer struct {
|
||||
term uint64 // must use atomic operations to access; keep 64-bit aligned.
|
||||
lead uint64 // must use atomic operations to access; keep 64-bit aligned.
|
||||
|
||||
consistIndex cindex.ConsistentIndexer // consistIndex is used to get/set/save consistentIndex
|
||||
r raftNode // uses 64-bit atomics; keep 64-bit aligned.
|
||||
consistentIdx uint64 // must use atomic operations to access; keep 64-bit aligned.
|
||||
consistentTerm uint64 // must use atomic operations to access; keep 64-bit aligned.
|
||||
consistIndex cindex.ConsistentIndexer // consistIndex is used to get/set/save consistentIndex
|
||||
r raftNode // uses 64-bit atomics; keep 64-bit aligned.
|
||||
|
||||
readych chan struct{}
|
||||
Cfg config.ServerConfig
|
||||
@ -341,6 +343,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
|
||||
srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster)
|
||||
|
||||
srv.be = b.storage.backend.be
|
||||
srv.be.SetTxPostLockHook(srv.getTxPostLockHook())
|
||||
srv.beHooks = b.storage.backend.beHooks
|
||||
minTTL := time.Duration((3*cfg.ElectionTicks)/2) * heartbeat
|
||||
|
||||
@ -978,6 +981,8 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
|
||||
}
|
||||
|
||||
s.consistIndex.SetBackend(newbe)
|
||||
newbe.SetTxPostLockHook(s.getTxPostLockHook())
|
||||
|
||||
lg.Info("restored mvcc store", zap.Uint64("consistent-index", s.consistIndex.ConsistentIndex()))
|
||||
|
||||
// Closing old backend might block until all the txns
|
||||
@ -1547,6 +1552,15 @@ func (s *EtcdServer) getTerm() uint64 {
|
||||
return atomic.LoadUint64(&s.term)
|
||||
}
|
||||
|
||||
func (s *EtcdServer) setConsistentIndexAndTerm(cIdx, cTerm uint64) {
|
||||
atomic.StoreUint64(&s.consistentIdx, cIdx)
|
||||
atomic.StoreUint64(&s.consistentTerm, cTerm)
|
||||
}
|
||||
|
||||
func (s *EtcdServer) getConsistentIndexAndTerm() (uint64, uint64) {
|
||||
return atomic.LoadUint64(&s.consistentIdx), atomic.LoadUint64(&s.consistentTerm)
|
||||
}
|
||||
|
||||
func (s *EtcdServer) setLead(v uint64) {
|
||||
atomic.StoreUint64(&s.lead, v)
|
||||
}
|
||||
@ -1771,7 +1785,7 @@ func (s *EtcdServer) apply(
|
||||
|
||||
// set the consistent index of current executing entry
|
||||
if e.Index > s.consistIndex.ConsistentIndex() {
|
||||
s.consistIndex.SetConsistentIndex(e.Index, e.Term)
|
||||
s.setConsistentIndexAndTerm(e.Index, e.Term)
|
||||
shouldApplyV3 = membership.ApplyBoth
|
||||
}
|
||||
|
||||
@ -1801,7 +1815,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
|
||||
index := s.consistIndex.ConsistentIndex()
|
||||
if e.Index > index {
|
||||
// set the consistent index of current executing entry
|
||||
s.consistIndex.SetConsistentIndex(e.Index, e.Term)
|
||||
s.setConsistentIndexAndTerm(e.Index, e.Term)
|
||||
shouldApplyV3 = membership.ApplyBoth
|
||||
}
|
||||
s.lg.Debug("apply entry normal",
|
||||
@ -2296,3 +2310,12 @@ func (s *EtcdServer) raftStatus() raft.Status {
|
||||
func (s *EtcdServer) Version() *serverversion.Manager {
|
||||
return serverversion.NewManager(s.Logger(), NewServerVersionAdapter(s))
|
||||
}
|
||||
|
||||
func (s *EtcdServer) getTxPostLockHook() func() {
|
||||
return func() {
|
||||
cIdx, term := s.getConsistentIndexAndTerm()
|
||||
if cIdx > s.consistIndex.UnsafeConsistentIndex() {
|
||||
s.consistIndex.SetConsistentIndex(cIdx, term)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -687,9 +687,7 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {
|
||||
|
||||
_, appliedi, _ := srv.apply(ents, &raftpb.ConfState{})
|
||||
consistIndex := srv.consistIndex.ConsistentIndex()
|
||||
if consistIndex != appliedi {
|
||||
t.Fatalf("consistIndex = %v, want %v", consistIndex, appliedi)
|
||||
}
|
||||
assert.Equal(t, uint64(2), appliedi)
|
||||
|
||||
t.Run("verify-backend", func(t *testing.T) {
|
||||
tx := be.BatchTx()
|
||||
@ -698,9 +696,8 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {
|
||||
srv.beHooks.OnPreCommitUnsafe(tx)
|
||||
assert.Equal(t, raftpb.ConfState{Voters: []uint64{2}}, *schema.UnsafeConfStateFromBackend(lg, tx))
|
||||
})
|
||||
rindex, rterm := schema.ReadConsistentIndex(be.BatchTx())
|
||||
rindex, _ := schema.ReadConsistentIndex(be.ReadTx())
|
||||
assert.Equal(t, consistIndex, rindex)
|
||||
assert.Equal(t, uint64(4), rterm)
|
||||
}
|
||||
|
||||
func realisticRaftNode(lg *zap.Logger) *raftNode {
|
||||
|
@ -797,7 +797,7 @@ func (le *lessor) findDueScheduledCheckpoints(checkpointLimit int) []*pb.LeaseCh
|
||||
func (le *lessor) initAndRecover() {
|
||||
tx := le.b.BatchTx()
|
||||
|
||||
tx.Lock()
|
||||
tx.LockWithoutHook()
|
||||
schema.UnsafeCreateLeaseBucket(tx)
|
||||
lpbs := schema.MustUnsafeGetAllLeases(tx)
|
||||
tx.Unlock()
|
||||
|
@ -99,7 +99,7 @@ func OpenBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend {
|
||||
func RecoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot, beExist bool, hooks *BackendHooks) (backend.Backend, error) {
|
||||
consistentIndex := uint64(0)
|
||||
if beExist {
|
||||
consistentIndex, _ = schema.ReadConsistentIndex(oldbe.BatchTx())
|
||||
consistentIndex, _ = schema.ReadConsistentIndex(oldbe.ReadTx())
|
||||
}
|
||||
if snapshot.Metadata.Index <= consistentIndex {
|
||||
return oldbe, nil
|
||||
|
@ -67,6 +67,9 @@ type Backend interface {
|
||||
Defrag() error
|
||||
ForceCommit()
|
||||
Close() error
|
||||
|
||||
// SetTxPostLockHook sets a txPostLockHook.
|
||||
SetTxPostLockHook(func())
|
||||
}
|
||||
|
||||
type Snapshot interface {
|
||||
@ -119,6 +122,9 @@ type backend struct {
|
||||
|
||||
hooks Hooks
|
||||
|
||||
// txPostLockHook is called each time right after locking the tx.
|
||||
txPostLockHook func()
|
||||
|
||||
lg *zap.Logger
|
||||
}
|
||||
|
||||
@ -227,6 +233,14 @@ func (b *backend) BatchTx() BatchTx {
|
||||
return b.batchTx
|
||||
}
|
||||
|
||||
func (b *backend) SetTxPostLockHook(hook func()) {
|
||||
// It needs to lock the batchTx, because the periodic commit
|
||||
// may be accessing the txPostLockHook at the moment.
|
||||
b.batchTx.LockWithoutHook()
|
||||
defer b.batchTx.Unlock()
|
||||
b.txPostLockHook = hook
|
||||
}
|
||||
|
||||
func (b *backend) ReadTx() ReadTx { return b.readTx }
|
||||
|
||||
// ConcurrentReadTx creates and returns a new ReadTx, which:
|
||||
@ -438,7 +452,7 @@ func (b *backend) defrag() error {
|
||||
// TODO: make this non-blocking?
|
||||
// lock batchTx to ensure nobody is using previous tx, and then
|
||||
// close previous ongoing tx.
|
||||
b.batchTx.Lock()
|
||||
b.batchTx.LockWithoutHook()
|
||||
defer b.batchTx.Unlock()
|
||||
|
||||
// lock database after lock tx to avoid deadlock.
|
||||
|
@ -55,6 +55,7 @@ type BatchTx interface {
|
||||
CommitAndStop()
|
||||
LockInsideApply()
|
||||
LockOutsideApply()
|
||||
|
||||
}
|
||||
|
||||
type batchTx struct {
|
||||
@ -66,6 +67,13 @@ type batchTx struct {
|
||||
}
|
||||
|
||||
func (t *batchTx) Lock() {
|
||||
t.LockWithoutHook()
|
||||
if t.backend.txPostLockHook != nil {
|
||||
t.backend.txPostLockHook()
|
||||
}
|
||||
}
|
||||
|
||||
func (t *batchTx) LockWithoutHook() {
|
||||
t.Mutex.Lock()
|
||||
}
|
||||
|
||||
@ -226,14 +234,14 @@ func unsafeForEach(tx *bolt.Tx, bucket Bucket, visitor func(k, v []byte) error)
|
||||
|
||||
// Commit commits a previous tx and begins a new writable one.
|
||||
func (t *batchTx) Commit() {
|
||||
t.Lock()
|
||||
t.LockWithoutHook()
|
||||
t.commit(false)
|
||||
t.Unlock()
|
||||
}
|
||||
|
||||
// CommitAndStop commits the previous tx and does not create a new one.
|
||||
func (t *batchTx) CommitAndStop() {
|
||||
t.Lock()
|
||||
t.LockWithoutHook()
|
||||
t.commit(true)
|
||||
t.Unlock()
|
||||
}
|
||||
@ -303,13 +311,13 @@ func (t *batchTxBuffered) Unlock() {
|
||||
}
|
||||
|
||||
func (t *batchTxBuffered) Commit() {
|
||||
t.Lock()
|
||||
t.LockWithoutHook()
|
||||
t.commit(false)
|
||||
t.Unlock()
|
||||
}
|
||||
|
||||
func (t *batchTxBuffered) CommitAndStop() {
|
||||
t.Lock()
|
||||
t.LockWithoutHook()
|
||||
t.commit(true)
|
||||
t.Unlock()
|
||||
}
|
||||
|
@ -121,7 +121,7 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfi
|
||||
}
|
||||
|
||||
tx := s.b.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockWithoutHook()
|
||||
tx.UnsafeCreateBucket(schema.Key)
|
||||
schema.UnsafeCreateMetaBucket(tx)
|
||||
tx.Unlock()
|
||||
@ -331,7 +331,7 @@ func (s *store) restore() error {
|
||||
|
||||
// restore index
|
||||
tx := s.b.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockWithoutHook()
|
||||
|
||||
finishedCompact, found := UnsafeReadFinishedCompact(tx)
|
||||
if found {
|
||||
|
@ -42,7 +42,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc
|
||||
start := time.Now()
|
||||
|
||||
tx := s.b.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockWithoutHook()
|
||||
keys, _ := tx.UnsafeRange(schema.Key, last, end, int64(batchNum))
|
||||
for _, key := range keys {
|
||||
rev = bytesToRev(key)
|
||||
|
@ -881,6 +881,7 @@ type fakeBatchTx struct {
|
||||
rangeRespc chan rangeResp
|
||||
}
|
||||
|
||||
func (b *fakeBatchTx) LockWithoutHook() {}
|
||||
func (b *fakeBatchTx) Lock() {}
|
||||
func (b *fakeBatchTx) Unlock() {}
|
||||
func (b *fakeBatchTx) RLock() {}
|
||||
@ -924,6 +925,7 @@ func (b *fakeBackend) Snapshot() backend.Snapshot
|
||||
func (b *fakeBackend) ForceCommit() {}
|
||||
func (b *fakeBackend) Defrag() error { return nil }
|
||||
func (b *fakeBackend) Close() error { return nil }
|
||||
func (b *fakeBackend) SetTxPostLockHook(func()) {}
|
||||
|
||||
type indexGetResp struct {
|
||||
rev revision
|
||||
|
@ -34,7 +34,7 @@ func NewAlarmBackend(lg *zap.Logger, be backend.Backend) *alarmBackend {
|
||||
|
||||
func (s *alarmBackend) CreateAlarmBucket() {
|
||||
tx := s.be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockWithoutHook()
|
||||
defer tx.Unlock()
|
||||
tx.UnsafeCreateBucket(Alarm)
|
||||
}
|
||||
|
@ -49,7 +49,7 @@ func NewAuthBackend(lg *zap.Logger, be backend.Backend) *authBackend {
|
||||
|
||||
func (abe *authBackend) CreateAuthBuckets() {
|
||||
tx := abe.be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockWithoutHook()
|
||||
defer tx.Unlock()
|
||||
tx.UnsafeCreateBucket(Auth)
|
||||
tx.UnsafeCreateBucket(AuthUsers)
|
||||
|
@ -26,7 +26,7 @@ func UnsafeCreateMetaBucket(tx backend.BatchTx) {
|
||||
|
||||
// CreateMetaBucket creates the `meta` bucket (if it does not exists yet).
|
||||
func CreateMetaBucket(tx backend.BatchTx) {
|
||||
tx.Lock()
|
||||
tx.LockWithoutHook()
|
||||
defer tx.Unlock()
|
||||
tx.UnsafeCreateBucket(Meta)
|
||||
}
|
||||
@ -51,8 +51,8 @@ func UnsafeReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) {
|
||||
// ReadConsistentIndex loads consistent index and term from given transaction.
|
||||
// returns 0 if the data are not found.
|
||||
func ReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) {
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
tx.RLock()
|
||||
defer tx.RUnlock()
|
||||
return UnsafeReadConsistentIndex(tx)
|
||||
}
|
||||
|
||||
|
@ -61,7 +61,7 @@ func (s *membershipBackend) MustSaveMemberToBackend(m *membership.Member) {
|
||||
// from the v3 backend.
|
||||
func (s *membershipBackend) TrimClusterFromBackend() error {
|
||||
tx := s.be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockWithoutHook()
|
||||
defer tx.Unlock()
|
||||
tx.UnsafeDeleteBucket(Cluster)
|
||||
return nil
|
||||
@ -121,7 +121,7 @@ func (s *membershipBackend) readMembersFromBackend() (map[types.ID]*membership.M
|
||||
func (s *membershipBackend) TrimMembershipFromBackend() error {
|
||||
s.lg.Info("Trimming membership information from the backend...")
|
||||
tx := s.be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockWithoutHook()
|
||||
defer tx.Unlock()
|
||||
err := tx.UnsafeForEach(Members, func(k, v []byte) error {
|
||||
tx.UnsafeDelete(Members, k)
|
||||
@ -167,7 +167,7 @@ func (s *membershipBackend) MustSaveDowngradeToBackend(downgrade *version.Downgr
|
||||
|
||||
func (s *membershipBackend) MustCreateBackendBuckets() {
|
||||
tx := s.be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.LockWithoutHook()
|
||||
defer tx.Unlock()
|
||||
tx.UnsafeCreateBucket(Members)
|
||||
tx.UnsafeCreateBucket(MembersRemoved)
|
||||
|
@ -49,7 +49,7 @@ func newPlan(lg *zap.Logger, current semver.Version, target semver.Version) (pla
|
||||
}
|
||||
|
||||
func (p migrationPlan) Execute(lg *zap.Logger, tx backend.BatchTx) error {
|
||||
tx.Lock()
|
||||
tx.LockWithoutHook()
|
||||
defer tx.Unlock()
|
||||
return p.unsafeExecute(lg, tx)
|
||||
}
|
||||
@ -90,7 +90,7 @@ func newMigrationStep(v semver.Version, isUpgrade bool, changes []schemaChange)
|
||||
|
||||
// execute runs actions required to migrate etcd storage between two minor versions.
|
||||
func (s migrationStep) execute(lg *zap.Logger, tx backend.BatchTx) error {
|
||||
tx.Lock()
|
||||
tx.LockWithoutHook()
|
||||
defer tx.Unlock()
|
||||
return s.unsafeExecute(lg, tx)
|
||||
}
|
||||
|
@ -31,7 +31,7 @@ var (
|
||||
|
||||
// Validate checks provided backend to confirm that schema used is supported.
|
||||
func Validate(lg *zap.Logger, tx backend.BatchTx) error {
|
||||
tx.Lock()
|
||||
tx.LockWithoutHook()
|
||||
defer tx.Unlock()
|
||||
return unsafeValidate(lg, tx)
|
||||
}
|
||||
@ -60,7 +60,7 @@ type WALVersion interface {
|
||||
// Migrate updates storage schema to provided target version.
|
||||
// Downgrading requires that provided WAL doesn't contain unsupported entries.
|
||||
func Migrate(lg *zap.Logger, tx backend.BatchTx, w WALVersion, target semver.Version) error {
|
||||
tx.Lock()
|
||||
tx.LockWithoutHook()
|
||||
defer tx.Unlock()
|
||||
return UnsafeMigrate(lg, tx, w, target)
|
||||
}
|
||||
@ -89,8 +89,8 @@ func UnsafeMigrate(lg *zap.Logger, tx backend.BatchTx, w WALVersion, target semv
|
||||
// * v3.5 will return it's version if it includes all storage fields added in v3.5 (might require a snapshot).
|
||||
// * v3.4 and older is not supported and will return error.
|
||||
func DetectSchemaVersion(lg *zap.Logger, tx backend.ReadTx) (v semver.Version, err error) {
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
tx.RLock()
|
||||
defer tx.RUnlock()
|
||||
return UnsafeDetectSchemaVersion(lg, tx)
|
||||
}
|
||||
|
||||
|
@ -108,8 +108,7 @@ func MustVerifyIfEnabled(cfg Config) {
|
||||
}
|
||||
|
||||
func validateConsistentIndex(cfg Config, hardstate *raftpb.HardState, snapshot *walpb.Snapshot, be backend.Backend) error {
|
||||
tx := be.BatchTx()
|
||||
index, term := schema.ReadConsistentIndex(tx)
|
||||
index, term := schema.ReadConsistentIndex(be.ReadTx())
|
||||
if cfg.ExactIndex && index != hardstate.Commit {
|
||||
return fmt.Errorf("backend.ConsistentIndex (%v) expected == WAL.HardState.commit (%v)", index, hardstate.Commit)
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user