mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
*: minor cleanup for lease
This commit is contained in:
@@ -405,6 +405,10 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
|
||||
|
||||
srv.be = be
|
||||
srv.lessor = lease.NewLessor(srv.be)
|
||||
|
||||
// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
|
||||
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
|
||||
srv.lessor = lease.NewLessor(srv.be)
|
||||
srv.kv = mvcc.New(srv.be, srv.lessor, &srv.consistIndex)
|
||||
if beExist {
|
||||
kvindex := srv.kv.ConsistentIndex()
|
||||
@@ -413,6 +417,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
|
||||
}
|
||||
}
|
||||
srv.consistIndex.setConsistentIndex(srv.kv.ConsistentIndex())
|
||||
|
||||
srv.authStore = auth.NewAuthStore(srv.be)
|
||||
if h := cfg.AutoCompactionRetention; h != 0 {
|
||||
srv.compactor = compactor.NewPeriodic(h, srv.kv, srv)
|
||||
@@ -658,6 +663,8 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
|
||||
|
||||
newbe := backend.NewDefaultBackend(fn)
|
||||
|
||||
// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
|
||||
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
|
||||
if s.lessor != nil {
|
||||
plog.Info("recovering lessor...")
|
||||
s.lessor.Recover(newbe, s.kv)
|
||||
|
||||
@@ -216,29 +216,31 @@ func (le *lessor) Revoke(id LeaseID) error {
|
||||
// unlock before doing external work
|
||||
le.mu.Unlock()
|
||||
|
||||
if le.rd != nil {
|
||||
tid := le.rd.TxnBegin()
|
||||
for item := range l.itemSet {
|
||||
_, _, err := le.rd.TxnDeleteRange(tid, []byte(item.Key), nil)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
if le.rd == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
le.mu.Lock()
|
||||
defer le.mu.Unlock()
|
||||
delete(le.leaseMap, l.ID)
|
||||
// lease deletion needs to be in the same backend transcation with the
|
||||
// kv deletion. Or we might end up with not executing the revoke or not
|
||||
// deleting the keys if etcdserver fails in between.
|
||||
l.removeFrom(le.b)
|
||||
|
||||
err := le.rd.TxnEnd(tid)
|
||||
tid := le.rd.TxnBegin()
|
||||
for item := range l.itemSet {
|
||||
_, _, err := le.rd.TxnDeleteRange(tid, []byte(item.Key), nil)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
le.mu.Lock()
|
||||
defer le.mu.Unlock()
|
||||
delete(le.leaseMap, l.ID)
|
||||
// lease deletion needs to be in the same backend transaction with the
|
||||
// kv deletion. Or we might end up with not executing the revoke or not
|
||||
// deleting the keys if etcdserver fails in between.
|
||||
le.b.BatchTx().UnsafeDelete(leaseBucketName, int64ToBytes(int64(l.ID)))
|
||||
|
||||
err := le.rd.TxnEnd(tid)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -460,14 +462,7 @@ func (l Lease) persistTo(b backend.Backend) {
|
||||
b.BatchTx().Unlock()
|
||||
}
|
||||
|
||||
func (l Lease) removeFrom(b backend.Backend) {
|
||||
key := int64ToBytes(int64(l.ID))
|
||||
|
||||
b.BatchTx().UnsafeDelete(leaseBucketName, key)
|
||||
}
|
||||
|
||||
// refresh refreshes the expiry of the lease. It extends the expiry at least
|
||||
// minLeaseTTL second.
|
||||
// refresh refreshes the expiry of the lease.
|
||||
func (l *Lease) refresh(extend time.Duration) {
|
||||
if l.TTL < minLeaseTTL {
|
||||
l.TTL = minLeaseTTL
|
||||
|
||||
Reference in New Issue
Block a user