mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
commit
4a7fabd219
@ -395,6 +395,9 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
|
||||
|
||||
srv.be = be
|
||||
minTTL := time.Duration((3*cfg.ElectionTicks)/2) * time.Duration(cfg.TickMs) * time.Millisecond
|
||||
|
||||
// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
|
||||
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
|
||||
srv.lessor = lease.NewLessor(srv.be, int64(math.Ceil(minTTL.Seconds())))
|
||||
srv.kv = mvcc.New(srv.be, srv.lessor, &srv.consistIndex)
|
||||
if beExist {
|
||||
@ -404,6 +407,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
|
||||
}
|
||||
}
|
||||
srv.consistIndex.setConsistentIndex(srv.kv.ConsistentIndex())
|
||||
|
||||
srv.authStore = auth.NewAuthStore(srv.be)
|
||||
if h := cfg.AutoCompactionRetention; h != 0 {
|
||||
srv.compactor = compactor.NewPeriodic(h, srv.kv, srv)
|
||||
@ -660,6 +664,14 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
|
||||
|
||||
newbe := backend.NewDefaultBackend(fn)
|
||||
|
||||
// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
|
||||
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
|
||||
if s.lessor != nil {
|
||||
plog.Info("recovering lessor...")
|
||||
s.lessor.Recover(newbe, s.kv)
|
||||
plog.Info("finished recovering lessor")
|
||||
}
|
||||
|
||||
plog.Info("restoring mvcc store...")
|
||||
|
||||
if err := s.kv.Restore(newbe); err != nil {
|
||||
@ -686,12 +698,6 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
|
||||
s.be = newbe
|
||||
s.bemu.Unlock()
|
||||
|
||||
if s.lessor != nil {
|
||||
plog.Info("recovering lessor...")
|
||||
s.lessor.Recover(newbe, s.kv)
|
||||
plog.Info("finished recovering lessor")
|
||||
}
|
||||
|
||||
plog.Info("recovering alarms...")
|
||||
if err := s.restoreAlarms(); err != nil {
|
||||
plog.Panicf("restore alarms error: %v", err)
|
||||
|
@ -43,13 +43,18 @@ var (
|
||||
|
||||
type LeaseID int64
|
||||
|
||||
// RangeDeleter defines an interface with DeleteRange method.
|
||||
// RangeDeleter defines an interface with Txn and DeleteRange method.
|
||||
// We define this interface only for lessor to limit the number
|
||||
// of methods of mvcc.KV to what lessor actually needs.
|
||||
//
|
||||
// Having a minimum interface makes testing easy.
|
||||
type RangeDeleter interface {
|
||||
DeleteRange(key, end []byte) (int64, int64)
|
||||
// TxnBegin see comments on mvcc.KV
|
||||
TxnBegin() int64
|
||||
// TxnEnd see comments on mvcc.KV
|
||||
TxnEnd(txnID int64) error
|
||||
// TxnDeleteRange see comments on mvcc.KV
|
||||
TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error)
|
||||
}
|
||||
|
||||
// Lessor owns leases. It can grant, revoke, renew and modify leases for lessee.
|
||||
@ -218,16 +223,30 @@ func (le *lessor) Revoke(id LeaseID) error {
|
||||
// unlock before doing external work
|
||||
le.mu.Unlock()
|
||||
|
||||
if le.rd != nil {
|
||||
for item := range l.itemSet {
|
||||
le.rd.DeleteRange([]byte(item.Key), nil)
|
||||
if le.rd == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
tid := le.rd.TxnBegin()
|
||||
for item := range l.itemSet {
|
||||
_, _, err := le.rd.TxnDeleteRange(tid, []byte(item.Key), nil)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
le.mu.Lock()
|
||||
defer le.mu.Unlock()
|
||||
delete(le.leaseMap, l.ID)
|
||||
l.removeFrom(le.b)
|
||||
// lease deletion needs to be in the same backend transaction with the
|
||||
// kv deletion. Or we might end up with not executing the revoke or not
|
||||
// deleting the keys if etcdserver fails in between.
|
||||
le.b.BatchTx().UnsafeDelete(leaseBucketName, int64ToBytes(int64(l.ID)))
|
||||
|
||||
err := le.rd.TxnEnd(tid)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -453,14 +472,6 @@ func (l Lease) persistTo(b backend.Backend) {
|
||||
b.BatchTx().Unlock()
|
||||
}
|
||||
|
||||
func (l Lease) removeFrom(b backend.Backend) {
|
||||
key := int64ToBytes(int64(l.ID))
|
||||
|
||||
b.BatchTx().Lock()
|
||||
b.BatchTx().UnsafeDelete(leaseBucketName, key)
|
||||
b.BatchTx().Unlock()
|
||||
}
|
||||
|
||||
// refresh refreshes the expiry of the lease.
|
||||
func (l *Lease) refresh(extend time.Duration) {
|
||||
l.expiry = time.Now().Add(extend + time.Second*time.Duration(l.TTL))
|
||||
|
@ -225,9 +225,17 @@ type fakeDeleter struct {
|
||||
deleted []string
|
||||
}
|
||||
|
||||
func (fd *fakeDeleter) DeleteRange(key, end []byte) (int64, int64) {
|
||||
func (fd *fakeDeleter) TxnBegin() int64 {
|
||||
return 0
|
||||
}
|
||||
|
||||
func (fd *fakeDeleter) TxnEnd(txnID int64) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fd *fakeDeleter) TxnDeleteRange(tid int64, key, end []byte) (int64, int64, error) {
|
||||
fd.deleted = append(fd.deleted, string(key)+"_"+string(end))
|
||||
return 0, 0
|
||||
return 0, 0, nil
|
||||
}
|
||||
|
||||
func NewTestBackend(t *testing.T) (string, backend.Backend) {
|
||||
|
@ -367,6 +367,8 @@ func (s *store) restore() error {
|
||||
revToBytes(revision{main: 1}, min)
|
||||
revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max)
|
||||
|
||||
keyToLease := make(map[string]lease.LeaseID)
|
||||
|
||||
// restore index
|
||||
tx := s.b.BatchTx()
|
||||
tx.Lock()
|
||||
@ -390,26 +392,15 @@ func (s *store) restore() error {
|
||||
switch {
|
||||
case isTombstone(key):
|
||||
s.kvindex.Tombstone(kv.Key, rev)
|
||||
if lease.LeaseID(kv.Lease) != lease.NoLease {
|
||||
err := s.le.Detach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}})
|
||||
if err != nil && err != lease.ErrLeaseNotFound {
|
||||
plog.Fatalf("unexpected Detach error %v", err)
|
||||
}
|
||||
}
|
||||
delete(keyToLease, string(kv.Key))
|
||||
|
||||
default:
|
||||
s.kvindex.Restore(kv.Key, revision{kv.CreateRevision, 0}, rev, kv.Version)
|
||||
if lease.LeaseID(kv.Lease) != lease.NoLease {
|
||||
if s.le == nil {
|
||||
panic("no lessor to attach lease")
|
||||
}
|
||||
err := s.le.Attach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}})
|
||||
// We are walking through the kv history here. It is possible that we attached a key to
|
||||
// the lease and the lease was revoked later.
|
||||
// Thus attaching an old version of key to a none existing lease is possible here, and
|
||||
// we should just ignore the error.
|
||||
if err != nil && err != lease.ErrLeaseNotFound {
|
||||
panic("unexpected Attach error")
|
||||
}
|
||||
|
||||
if lid := lease.LeaseID(kv.Lease); lid != lease.NoLease {
|
||||
keyToLease[string(kv.Key)] = lid
|
||||
} else {
|
||||
delete(keyToLease, string(kv.Key))
|
||||
}
|
||||
}
|
||||
|
||||
@ -417,6 +408,16 @@ func (s *store) restore() error {
|
||||
s.currentRev = rev
|
||||
}
|
||||
|
||||
for key, lid := range keyToLease {
|
||||
if s.le == nil {
|
||||
panic("no lessor to attach lease")
|
||||
}
|
||||
err := s.le.Attach(lid, []lease.LeaseItem{{Key: key}})
|
||||
if err != nil {
|
||||
plog.Errorf("unexpected Attach error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
_, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0)
|
||||
scheduledCompact := int64(0)
|
||||
if len(scheduledCompactBytes) != 0 {
|
||||
@ -550,7 +551,7 @@ func (s *store) put(key, value []byte, leaseID lease.LeaseID) {
|
||||
|
||||
err = s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}})
|
||||
if err != nil {
|
||||
panic("unexpected error from lease detach")
|
||||
plog.Errorf("unexpected error from lease detach: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -619,7 +620,7 @@ func (s *store) delete(key []byte, rev revision) {
|
||||
if lease.LeaseID(kv.Lease) != lease.NoLease {
|
||||
err = s.le.Detach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}})
|
||||
if err != nil {
|
||||
plog.Fatalf("cannot detach %v", err)
|
||||
plog.Errorf("cannot detach %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user