mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
*: minor cleanup for lease
This commit is contained in:
parent
bd62b0a646
commit
d69d438289
@ -395,6 +395,9 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
|
||||
|
||||
srv.be = be
|
||||
minTTL := time.Duration((3*cfg.ElectionTicks)/2) * time.Duration(cfg.TickMs) * time.Millisecond
|
||||
|
||||
// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
|
||||
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
|
||||
srv.lessor = lease.NewLessor(srv.be, int64(math.Ceil(minTTL.Seconds())))
|
||||
srv.kv = mvcc.New(srv.be, srv.lessor, &srv.consistIndex)
|
||||
if beExist {
|
||||
@ -404,6 +407,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
|
||||
}
|
||||
}
|
||||
srv.consistIndex.setConsistentIndex(srv.kv.ConsistentIndex())
|
||||
|
||||
srv.authStore = auth.NewAuthStore(srv.be)
|
||||
if h := cfg.AutoCompactionRetention; h != 0 {
|
||||
srv.compactor = compactor.NewPeriodic(h, srv.kv, srv)
|
||||
@ -660,6 +664,8 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
|
||||
|
||||
newbe := backend.NewDefaultBackend(fn)
|
||||
|
||||
// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
|
||||
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
|
||||
if s.lessor != nil {
|
||||
plog.Info("recovering lessor...")
|
||||
s.lessor.Recover(newbe, s.kv)
|
||||
|
@ -223,29 +223,31 @@ func (le *lessor) Revoke(id LeaseID) error {
|
||||
// unlock before doing external work
|
||||
le.mu.Unlock()
|
||||
|
||||
if le.rd != nil {
|
||||
tid := le.rd.TxnBegin()
|
||||
for item := range l.itemSet {
|
||||
_, _, err := le.rd.TxnDeleteRange(tid, []byte(item.Key), nil)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
if le.rd == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
le.mu.Lock()
|
||||
defer le.mu.Unlock()
|
||||
delete(le.leaseMap, l.ID)
|
||||
// lease deletion needs to be in the same backend transcation with the
|
||||
// kv deletion. Or we might end up with not executing the revoke or not
|
||||
// deleting the keys if etcdserver fails in between.
|
||||
l.removeFrom(le.b)
|
||||
|
||||
err := le.rd.TxnEnd(tid)
|
||||
tid := le.rd.TxnBegin()
|
||||
for item := range l.itemSet {
|
||||
_, _, err := le.rd.TxnDeleteRange(tid, []byte(item.Key), nil)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
le.mu.Lock()
|
||||
defer le.mu.Unlock()
|
||||
delete(le.leaseMap, l.ID)
|
||||
// lease deletion needs to be in the same backend transaction with the
|
||||
// kv deletion. Or we might end up with not executing the revoke or not
|
||||
// deleting the keys if etcdserver fails in between.
|
||||
le.b.BatchTx().UnsafeDelete(leaseBucketName, int64ToBytes(int64(l.ID)))
|
||||
|
||||
err := le.rd.TxnEnd(tid)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -470,12 +472,6 @@ func (l Lease) persistTo(b backend.Backend) {
|
||||
b.BatchTx().Unlock()
|
||||
}
|
||||
|
||||
func (l Lease) removeFrom(b backend.Backend) {
|
||||
key := int64ToBytes(int64(l.ID))
|
||||
|
||||
b.BatchTx().UnsafeDelete(leaseBucketName, key)
|
||||
}
|
||||
|
||||
// refresh refreshes the expiry of the lease.
|
||||
func (l *Lease) refresh(extend time.Duration) {
|
||||
l.expiry = time.Now().Add(extend + time.Second*time.Duration(l.TTL))
|
||||
|
Loading…
x
Reference in New Issue
Block a user