diff --git a/etcdserver/server.go b/etcdserver/server.go index 4da708f6d..c0732b883 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -395,6 +395,9 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) { srv.be = be minTTL := time.Duration((3*cfg.ElectionTicks)/2) * time.Duration(cfg.TickMs) * time.Millisecond + + // always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases. + // If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers. srv.lessor = lease.NewLessor(srv.be, int64(math.Ceil(minTTL.Seconds()))) srv.kv = mvcc.New(srv.be, srv.lessor, &srv.consistIndex) if beExist { @@ -404,6 +407,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) { } } srv.consistIndex.setConsistentIndex(srv.kv.ConsistentIndex()) + srv.authStore = auth.NewAuthStore(srv.be) if h := cfg.AutoCompactionRetention; h != 0 { srv.compactor = compactor.NewPeriodic(h, srv.kv, srv) @@ -660,6 +664,14 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { newbe := backend.NewDefaultBackend(fn) + // always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases. + // If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers. + if s.lessor != nil { + plog.Info("recovering lessor...") + s.lessor.Recover(newbe, s.kv) + plog.Info("finished recovering lessor") + } + plog.Info("restoring mvcc store...") if err := s.kv.Restore(newbe); err != nil { @@ -686,12 +698,6 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { s.be = newbe s.bemu.Unlock() - if s.lessor != nil { - plog.Info("recovering lessor...") - s.lessor.Recover(newbe, s.kv) - plog.Info("finished recovering lessor") - } - plog.Info("recovering alarms...") if err := s.restoreAlarms(); err != nil { plog.Panicf("restore alarms error: %v", err) diff --git a/lease/lessor.go b/lease/lessor.go index 45c8c2520..682d7376b 100644 --- a/lease/lessor.go +++ b/lease/lessor.go @@ -43,13 +43,18 @@ var ( type LeaseID int64 -// RangeDeleter defines an interface with DeleteRange method. +// RangeDeleter defines an interface with Txn and DeleteRange method. // We define this interface only for lessor to limit the number // of methods of mvcc.KV to what lessor actually needs. // // Having a minimum interface makes testing easy. type RangeDeleter interface { - DeleteRange(key, end []byte) (int64, int64) + // TxnBegin see comments on mvcc.KV + TxnBegin() int64 + // TxnEnd see comments on mvcc.KV + TxnEnd(txnID int64) error + // TxnDeleteRange see comments on mvcc.KV + TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) } // Lessor owns leases. It can grant, revoke, renew and modify leases for lessee. @@ -218,16 +223,30 @@ func (le *lessor) Revoke(id LeaseID) error { // unlock before doing external work le.mu.Unlock() - if le.rd != nil { - for item := range l.itemSet { - le.rd.DeleteRange([]byte(item.Key), nil) + if le.rd == nil { + return nil + } + + tid := le.rd.TxnBegin() + for item := range l.itemSet { + _, _, err := le.rd.TxnDeleteRange(tid, []byte(item.Key), nil) + if err != nil { + panic(err) } } le.mu.Lock() defer le.mu.Unlock() delete(le.leaseMap, l.ID) - l.removeFrom(le.b) + // lease deletion needs to be in the same backend transaction with the + // kv deletion. Or we might end up with not executing the revoke or not + // deleting the keys if etcdserver fails in between. + le.b.BatchTx().UnsafeDelete(leaseBucketName, int64ToBytes(int64(l.ID))) + + err := le.rd.TxnEnd(tid) + if err != nil { + panic(err) + } return nil } @@ -453,14 +472,6 @@ func (l Lease) persistTo(b backend.Backend) { b.BatchTx().Unlock() } -func (l Lease) removeFrom(b backend.Backend) { - key := int64ToBytes(int64(l.ID)) - - b.BatchTx().Lock() - b.BatchTx().UnsafeDelete(leaseBucketName, key) - b.BatchTx().Unlock() -} - // refresh refreshes the expiry of the lease. func (l *Lease) refresh(extend time.Duration) { l.expiry = time.Now().Add(extend + time.Second*time.Duration(l.TTL)) diff --git a/lease/lessor_test.go b/lease/lessor_test.go index 946e8c03b..adc682c81 100644 --- a/lease/lessor_test.go +++ b/lease/lessor_test.go @@ -225,9 +225,17 @@ type fakeDeleter struct { deleted []string } -func (fd *fakeDeleter) DeleteRange(key, end []byte) (int64, int64) { +func (fd *fakeDeleter) TxnBegin() int64 { + return 0 +} + +func (fd *fakeDeleter) TxnEnd(txnID int64) error { + return nil +} + +func (fd *fakeDeleter) TxnDeleteRange(tid int64, key, end []byte) (int64, int64, error) { fd.deleted = append(fd.deleted, string(key)+"_"+string(end)) - return 0, 0 + return 0, 0, nil } func NewTestBackend(t *testing.T) (string, backend.Backend) { diff --git a/mvcc/kvstore.go b/mvcc/kvstore.go index d1c71fd7c..249224221 100644 --- a/mvcc/kvstore.go +++ b/mvcc/kvstore.go @@ -367,6 +367,8 @@ func (s *store) restore() error { revToBytes(revision{main: 1}, min) revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max) + keyToLease := make(map[string]lease.LeaseID) + // restore index tx := s.b.BatchTx() tx.Lock() @@ -390,26 +392,15 @@ func (s *store) restore() error { switch { case isTombstone(key): s.kvindex.Tombstone(kv.Key, rev) - if lease.LeaseID(kv.Lease) != lease.NoLease { - err := s.le.Detach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}}) - if err != nil && err != lease.ErrLeaseNotFound { - plog.Fatalf("unexpected Detach error %v", err) - } - } + delete(keyToLease, string(kv.Key)) + default: s.kvindex.Restore(kv.Key, revision{kv.CreateRevision, 0}, rev, kv.Version) - if lease.LeaseID(kv.Lease) != lease.NoLease { - if s.le == nil { - panic("no lessor to attach lease") - } - err := s.le.Attach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}}) - // We are walking through the kv history here. It is possible that we attached a key to - // the lease and the lease was revoked later. - // Thus attaching an old version of key to a none existing lease is possible here, and - // we should just ignore the error. - if err != nil && err != lease.ErrLeaseNotFound { - panic("unexpected Attach error") - } + + if lid := lease.LeaseID(kv.Lease); lid != lease.NoLease { + keyToLease[string(kv.Key)] = lid + } else { + delete(keyToLease, string(kv.Key)) } } @@ -417,6 +408,16 @@ func (s *store) restore() error { s.currentRev = rev } + for key, lid := range keyToLease { + if s.le == nil { + panic("no lessor to attach lease") + } + err := s.le.Attach(lid, []lease.LeaseItem{{Key: key}}) + if err != nil { + plog.Errorf("unexpected Attach error: %v", err) + } + } + _, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0) scheduledCompact := int64(0) if len(scheduledCompactBytes) != 0 { @@ -550,7 +551,7 @@ func (s *store) put(key, value []byte, leaseID lease.LeaseID) { err = s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}}) if err != nil { - panic("unexpected error from lease detach") + plog.Errorf("unexpected error from lease detach: %v", err) } } @@ -619,7 +620,7 @@ func (s *store) delete(key []byte, rev revision) { if lease.LeaseID(kv.Lease) != lease.NoLease { err = s.le.Detach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}}) if err != nil { - plog.Fatalf("cannot detach %v", err) + plog.Errorf("cannot detach %v", err) } } }