lease: fix lease expire and add a test

This commit is contained in:
Xiang Li 2016-10-03 16:01:56 +08:00
parent c349e089b1
commit 279c103517
2 changed files with 168 additions and 24 deletions

View File

@ -110,20 +110,9 @@ type Lessor interface {
type lessor struct {
mu sync.Mutex
// primary indicates if this lessor is the primary lessor. The primary
// lessor manages lease expiration and renew.
//
// in etcd, raft leader is the primary. Thus there might be two primary
// leaders at the same time (raft allows concurrent leader but with different term)
// for at most a leader election timeout.
// The old primary leader cannot affect the correctness since its proposal has a
// smaller term and will not be committed.
//
// TODO: raft follower do not forward lease management proposals. There might be a
// very small window (within second normally which depends on go scheduling) that
// a raft follow is the primary between the raft leader demotion and lessor demotion.
// Usually this should not be a problem. Lease should not be that sensitive to timing.
primary bool
// demotec is set when the lessor is the primary.
// demotec will be closed if the lessor is demoted.
demotec chan struct{}
// TODO: probably this should be a heap with a secondary
// id index.
@ -173,6 +162,23 @@ func newLessor(b backend.Backend, minLeaseTTL int64) *lessor {
return l
}
// isPrimary indicates if this lessor is the primary lessor. The primary
// lessor manages lease expiration and renew.
//
// in etcd, raft leader is the primary. Thus there might be two primary
// leaders at the same time (raft allows concurrent leader but with different term)
// for at most a leader election timeout.
// The old primary leader cannot affect the correctness since its proposal has a
// smaller term and will not be committed.
//
// TODO: raft follower do not forward lease management proposals. There might be a
// very small window (within second normally which depends on go scheduling) that
// a raft follow is the primary between the raft leader demotion and lessor demotion.
// Usually this should not be a problem. Lease should not be that sensitive to timing.
func (le *lessor) isPrimary() bool {
return le.demotec != nil
}
func (le *lessor) SetRangeDeleter(rd RangeDeleter) {
le.mu.Lock()
defer le.mu.Unlock()
@ -187,7 +193,12 @@ func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) {
// TODO: when lessor is under high load, it should give out lease
// with longer TTL to reduce renew load.
l := &Lease{ID: id, TTL: ttl, itemSet: make(map[LeaseItem]struct{})}
l := &Lease{
ID: id,
TTL: ttl,
itemSet: make(map[LeaseItem]struct{}),
revokec: make(chan struct{}),
}
le.mu.Lock()
defer le.mu.Unlock()
@ -200,7 +211,7 @@ func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) {
l.TTL = le.minLeaseTTL
}
if le.primary {
if le.isPrimary() {
l.refresh(0)
} else {
l.forever()
@ -220,6 +231,7 @@ func (le *lessor) Revoke(id LeaseID) error {
le.mu.Unlock()
return ErrLeaseNotFound
}
defer close(l.revokec)
// unlock before doing external work
le.mu.Unlock()
@ -255,18 +267,40 @@ func (le *lessor) Revoke(id LeaseID) error {
// has expired, an error will be returned.
func (le *lessor) Renew(id LeaseID) (int64, error) {
le.mu.Lock()
defer le.mu.Unlock()
if !le.primary {
unlock := func() { le.mu.Unlock() }
defer func() { unlock() }()
if !le.isPrimary() {
// forward renew request to primary instead of returning error.
return -1, ErrNotPrimary
}
demotec := le.demotec
l := le.leaseMap[id]
if l == nil {
return -1, ErrLeaseNotFound
}
if l.expired() {
le.mu.Unlock()
unlock = func() {}
select {
// A expired lease might be pending for revoking or going through
// quorum to be revoked. To be accurate, renew request must wait for the
// deletion to complete.
case <-l.revokec:
return -1, ErrLeaseNotFound
// The expired lease might fail to be revoked if the primary changes.
// The caller will retry on ErrNotPrimary.
case <-demotec:
return -1, ErrNotPrimary
case <-le.stopC:
return -1, ErrNotPrimary
}
}
l.refresh(0)
return l.TTL, nil
}
@ -284,7 +318,7 @@ func (le *lessor) Promote(extend time.Duration) {
le.mu.Lock()
defer le.mu.Unlock()
le.primary = true
le.demotec = make(chan struct{})
// refresh the expiries of all leases.
for _, l := range le.leaseMap {
@ -301,7 +335,10 @@ func (le *lessor) Demote() {
l.forever()
}
le.primary = false
if le.demotec != nil {
close(le.demotec)
le.demotec = nil
}
}
// Attach attaches items to the lease with given ID. When the lease
@ -366,7 +403,7 @@ func (le *lessor) runLoop() {
var ls []*Lease
le.mu.Lock()
if le.primary {
if le.isPrimary() {
ls = le.findExpiredLeases()
}
le.mu.Unlock()
@ -395,12 +432,11 @@ func (le *lessor) runLoop() {
// leases that needed to be revoked.
func (le *lessor) findExpiredLeases() []*Lease {
leases := make([]*Lease, 0, 16)
now := time.Now()
for _, l := range le.leaseMap {
// TODO: probably should change to <= 100-500 millisecond to
// make up committing latency.
if l.expiry.Sub(now) <= 0 {
if l.expired() {
leases = append(leases, l)
}
}
@ -442,6 +478,7 @@ func (le *lessor) initAndRecover() {
// set expiry to forever, refresh when promoted
itemSet: make(map[LeaseItem]struct{}),
expiry: forever,
revokec: make(chan struct{}),
}
}
tx.Unlock()
@ -455,7 +492,12 @@ type Lease struct {
itemSet map[LeaseItem]struct{}
// expiry time in unixnano
expiry time.Time
expiry time.Time
revokec chan struct{}
}
func (l Lease) expired() bool {
return l.Remaining() <= 0
}
func (l Lease) persistTo(b backend.Backend) {

View File

@ -221,6 +221,108 @@ func TestLessorRecover(t *testing.T) {
}
}
func TestLessorExpire(t *testing.T) {
dir, be := NewTestBackend(t)
defer os.RemoveAll(dir)
defer be.Close()
testMinTTL := int64(1)
le := newLessor(be, testMinTTL)
defer le.Stop()
le.Promote(1 * time.Second)
l, err := le.Grant(1, testMinTTL)
if err != nil {
t.Fatalf("failed to create lease: %v", err)
}
select {
case el := <-le.ExpiredLeasesC():
if el[0].ID != l.ID {
t.Fatalf("expired id = %x, want %x", el[0].ID, l.ID)
}
case <-time.After(10 * time.Second):
t.Fatalf("failed to receive expired lease")
}
donec := make(chan struct{})
go func() {
// expired lease cannot be renewed
if _, err := le.Renew(l.ID); err != ErrLeaseNotFound {
t.Fatalf("unexpected renew")
}
donec <- struct{}{}
}()
select {
case <-donec:
t.Fatalf("renew finished before lease revocation")
case <-time.After(50 * time.Millisecond):
}
// expired lease can be revoked
if err := le.Revoke(l.ID); err != nil {
t.Fatalf("failed to revoke expired lease: %v", err)
}
select {
case <-donec:
case <-time.After(10 * time.Second):
t.Fatalf("renew has not returned after lease revocation")
}
}
func TestLessorExpireAndDemote(t *testing.T) {
dir, be := NewTestBackend(t)
defer os.RemoveAll(dir)
defer be.Close()
testMinTTL := int64(1)
le := newLessor(be, testMinTTL)
defer le.Stop()
le.Promote(1 * time.Second)
l, err := le.Grant(1, testMinTTL)
if err != nil {
t.Fatalf("failed to create lease: %v", err)
}
select {
case el := <-le.ExpiredLeasesC():
if el[0].ID != l.ID {
t.Fatalf("expired id = %x, want %x", el[0].ID, l.ID)
}
case <-time.After(10 * time.Second):
t.Fatalf("failed to receive expired lease")
}
donec := make(chan struct{})
go func() {
// expired lease cannot be renewed
if _, err := le.Renew(l.ID); err != ErrNotPrimary {
t.Fatalf("unexpected renew: %v", err)
}
donec <- struct{}{}
}()
select {
case <-donec:
t.Fatalf("renew finished before demotion")
case <-time.After(50 * time.Millisecond):
}
// demote will cause the renew request to fail with ErrNotPrimary
le.Demote()
select {
case <-donec:
case <-time.After(10 * time.Second):
t.Fatalf("renew has not returned after lessor demotion")
}
}
type fakeDeleter struct {
deleted []string
}