Merge pull request #4175 from xiang90/lease_expire

*: revoke expired leases
This commit is contained in:
Xiang Li 2016-01-08 13:47:06 -08:00
commit 0554a18060
3 changed files with 65 additions and 17 deletions

View File

@ -553,11 +553,26 @@ func (s *EtcdServer) run() {
<-appdonec <-appdonec
}() }()
var expiredLeaseC <-chan []*lease.Lease
if s.lessor != nil {
expiredLeaseC = s.lessor.ExpiredLeasesC()
}
for {
select { select {
case leases := <-expiredLeaseC:
go func() {
for _, l := range leases {
s.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: int64(l.ID)})
}
}()
case err := <-s.errorc: case err := <-s.errorc:
plog.Errorf("%s", err) plog.Errorf("%s", err)
plog.Infof("the data-dir used by this member must be removed.") plog.Infof("the data-dir used by this member must be removed.")
return
case <-s.stop: case <-s.stop:
return
}
} }
} }

View File

@ -36,6 +36,8 @@ var (
minLeaseTerm = 5 * time.Second minLeaseTerm = 5 * time.Second
leaseBucketName = []byte("lease") leaseBucketName = []byte("lease")
forever = time.Unix(math.MaxInt64, 0)
ErrNotPrimary = errors.New("not a primary lessor") ErrNotPrimary = errors.New("not a primary lessor")
) )
@ -69,6 +71,9 @@ type Lessor interface {
// Renew renews a lease with given ID. If the ID does not exist, an error // Renew renews a lease with given ID. If the ID does not exist, an error
// will be returned. // will be returned.
Renew(id LeaseID) error Renew(id LeaseID) error
// ExpiredLeasesC returens a chan that is used to receive expired leases.
ExpiredLeasesC() <-chan []*Lease
} }
// lessor implements Lessor interface. // lessor implements Lessor interface.
@ -108,6 +113,8 @@ type lessor struct {
// The leased items can be recovered by iterating all the keys in kv. // The leased items can be recovered by iterating all the keys in kv.
b backend.Backend b backend.Backend
expiredC chan []*Lease
idgen *idutil.Generator idgen *idutil.Generator
} }
@ -126,6 +133,8 @@ func newLessor(lessorID uint8, b backend.Backend, dr DeleteableRange) *lessor {
leaseMap: make(map[LeaseID]*Lease), leaseMap: make(map[LeaseID]*Lease),
b: b, b: b,
dr: dr, dr: dr,
// expiredC is a small buffered chan to avoid unncessary blocking.
expiredC: make(chan []*Lease, 16),
idgen: idutil.NewGenerator(lessorID, time.Now()), idgen: idutil.NewGenerator(lessorID, time.Now()),
} }
l.initAndRecover() l.initAndRecover()
@ -203,12 +212,22 @@ func (le *lessor) Promote() {
defer le.mu.Unlock() defer le.mu.Unlock()
le.primary = true le.primary = true
// refresh the expiries of all leases.
for _, l := range le.leaseMap {
l.expiry = minExpiry(time.Now(), time.Now().Add(time.Duration(l.TTL)*time.Second))
}
} }
func (le *lessor) Demote() { func (le *lessor) Demote() {
le.mu.Lock() le.mu.Lock()
defer le.mu.Unlock() defer le.mu.Unlock()
// set the expiries of all leases to forever
for _, l := range le.leaseMap {
l.expiry = forever
}
le.primary = false le.primary = false
} }
@ -241,28 +260,38 @@ func (le *lessor) Recover(b backend.Backend, dr DeleteableRange) {
le.initAndRecover() le.initAndRecover()
} }
func (le *lessor) ExpiredLeasesC() <-chan []*Lease {
return le.expiredC
}
func (le *lessor) runLoop() { func (le *lessor) runLoop() {
// TODO: stop runLoop // TODO: stop runLoop
for { for {
var ls []*Lease
le.mu.Lock() le.mu.Lock()
if le.primary { if le.primary {
le.revokeExpiredLeases(le.findExpiredLeases()) ls = le.findExpiredLeases()
} }
le.mu.Unlock() le.mu.Unlock()
time.Sleep(500 * time.Millisecond)
if len(ls) != 0 {
select {
case le.expiredC <- ls:
default:
// the receiver of expiredC is probably busy handling
// other stuff
// let's try this next time after 500ms
} }
} }
func (le *lessor) revokeExpiredLeases(expired []*Lease) { time.Sleep(500 * time.Millisecond)
// TODO: send revoke request to these expired lease through raft. }
} }
// findExpiredLeases loops all the leases in the leaseMap and returns the expired // findExpiredLeases loops all the leases in the leaseMap and returns the expired
// leases that needed to be revoked. // leases that needed to be revoked.
func (le *lessor) findExpiredLeases() []*Lease { func (le *lessor) findExpiredLeases() []*Lease {
le.mu.Lock()
defer le.mu.Unlock()
leases := make([]*Lease, 0, 16) leases := make([]*Lease, 0, 16)
now := time.Now() now := time.Now()
@ -306,7 +335,8 @@ func (le *lessor) initAndRecover() {
TTL: lpb.TTL, TTL: lpb.TTL,
// itemSet will be filled in when recover key-value pairs // itemSet will be filled in when recover key-value pairs
expiry: minExpiry(time.Now(), time.Now().Add(time.Second*time.Duration(lpb.TTL))), // set expiry to forever, refresh when promoted
expiry: forever,
} }
} }
tx.Unlock() tx.Unlock()

View File

@ -114,12 +114,15 @@ func TestLessorRenew(t *testing.T) {
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
le := newLessor(1, be, &fakeDeleteable{}) le := newLessor(1, be, &fakeDeleteable{})
le.Promote()
l := le.Grant(5) l := le.Grant(5)
// manually change the ttl field // manually change the ttl field
l.TTL = 10 l.TTL = 10
err := le.Renew(l.ID)
le.Renew(l.ID) if err != nil {
t.Fatalf("failed to renew lease (%v)", err)
}
l = le.get(l.ID) l = le.get(l.ID)
if l.expiry.Sub(time.Now()) < 9*time.Second { if l.expiry.Sub(time.Now()) < 9*time.Second {