diff --git a/etcdserver/server.go b/etcdserver/server.go index f0968692e..2da3a75d0 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -553,11 +553,26 @@ func (s *EtcdServer) run() { <-appdonec }() - select { - case err := <-s.errorc: - plog.Errorf("%s", err) - plog.Infof("the data-dir used by this member must be removed.") - case <-s.stop: + var expiredLeaseC <-chan []*lease.Lease + if s.lessor != nil { + expiredLeaseC = s.lessor.ExpiredLeasesC() + } + + for { + select { + case leases := <-expiredLeaseC: + go func() { + for _, l := range leases { + s.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: int64(l.ID)}) + } + }() + case err := <-s.errorc: + plog.Errorf("%s", err) + plog.Infof("the data-dir used by this member must be removed.") + return + case <-s.stop: + return + } } } diff --git a/lease/lessor.go b/lease/lessor.go index f328a937f..421ab7942 100644 --- a/lease/lessor.go +++ b/lease/lessor.go @@ -36,7 +36,9 @@ var ( minLeaseTerm = 5 * time.Second leaseBucketName = []byte("lease") - ErrNotPrimary = errors.New("not a primary lessor") + forever = time.Unix(math.MaxInt64, 0) + + ErrNotPrimary = errors.New("not a primary lessor") ) type LeaseID int64 @@ -69,6 +71,9 @@ type Lessor interface { // Renew renews a lease with given ID. If the ID does not exist, an error // will be returned. Renew(id LeaseID) error + + // ExpiredLeasesC returens a chan that is used to receive expired leases. + ExpiredLeasesC() <-chan []*Lease } // lessor implements Lessor interface. @@ -108,6 +113,8 @@ type lessor struct { // The leased items can be recovered by iterating all the keys in kv. b backend.Backend + expiredC chan []*Lease + idgen *idutil.Generator } @@ -126,6 +133,8 @@ func newLessor(lessorID uint8, b backend.Backend, dr DeleteableRange) *lessor { leaseMap: make(map[LeaseID]*Lease), b: b, dr: dr, + // expiredC is a small buffered chan to avoid unncessary blocking. + expiredC: make(chan []*Lease, 16), idgen: idutil.NewGenerator(lessorID, time.Now()), } l.initAndRecover() @@ -203,12 +212,22 @@ func (le *lessor) Promote() { defer le.mu.Unlock() le.primary = true + + // refresh the expiries of all leases. + for _, l := range le.leaseMap { + l.expiry = minExpiry(time.Now(), time.Now().Add(time.Duration(l.TTL)*time.Second)) + } } func (le *lessor) Demote() { le.mu.Lock() defer le.mu.Unlock() + // set the expiries of all leases to forever + for _, l := range le.leaseMap { + l.expiry = forever + } + le.primary = false } @@ -241,28 +260,38 @@ func (le *lessor) Recover(b backend.Backend, dr DeleteableRange) { le.initAndRecover() } +func (le *lessor) ExpiredLeasesC() <-chan []*Lease { + return le.expiredC +} + func (le *lessor) runLoop() { // TODO: stop runLoop for { + var ls []*Lease + le.mu.Lock() if le.primary { - le.revokeExpiredLeases(le.findExpiredLeases()) + ls = le.findExpiredLeases() } le.mu.Unlock() + + if len(ls) != 0 { + select { + case le.expiredC <- ls: + default: + // the receiver of expiredC is probably busy handling + // other stuff + // let's try this next time after 500ms + } + } + time.Sleep(500 * time.Millisecond) } } -func (le *lessor) revokeExpiredLeases(expired []*Lease) { - // TODO: send revoke request to these expired lease through raft. -} - // findExpiredLeases loops all the leases in the leaseMap and returns the expired // leases that needed to be revoked. func (le *lessor) findExpiredLeases() []*Lease { - le.mu.Lock() - defer le.mu.Unlock() - leases := make([]*Lease, 0, 16) now := time.Now() @@ -306,7 +335,8 @@ func (le *lessor) initAndRecover() { TTL: lpb.TTL, // itemSet will be filled in when recover key-value pairs - expiry: minExpiry(time.Now(), time.Now().Add(time.Second*time.Duration(lpb.TTL))), + // set expiry to forever, refresh when promoted + expiry: forever, } } tx.Unlock() diff --git a/lease/lessor_test.go b/lease/lessor_test.go index 6ce3435c3..50ecf6bea 100644 --- a/lease/lessor_test.go +++ b/lease/lessor_test.go @@ -114,12 +114,15 @@ func TestLessorRenew(t *testing.T) { defer os.RemoveAll(dir) le := newLessor(1, be, &fakeDeleteable{}) + le.Promote() l := le.Grant(5) // manually change the ttl field l.TTL = 10 - - le.Renew(l.ID) + err := le.Renew(l.ID) + if err != nil { + t.Fatalf("failed to renew lease (%v)", err) + } l = le.get(l.ID) if l.expiry.Sub(time.Now()) < 9*time.Second {