From 279c103517326daef10659859e6ba6587e194594 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 3 Oct 2016 16:01:56 +0800 Subject: [PATCH 1/2] lease: fix lease expire and add a test --- lease/lessor.go | 90 ++++++++++++++++++++++++++++---------- lease/lessor_test.go | 102 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 168 insertions(+), 24 deletions(-) diff --git a/lease/lessor.go b/lease/lessor.go index 5ebe7290c..4d5d1bc08 100644 --- a/lease/lessor.go +++ b/lease/lessor.go @@ -110,20 +110,9 @@ type Lessor interface { type lessor struct { mu sync.Mutex - // primary indicates if this lessor is the primary lessor. The primary - // lessor manages lease expiration and renew. - // - // in etcd, raft leader is the primary. Thus there might be two primary - // leaders at the same time (raft allows concurrent leader but with different term) - // for at most a leader election timeout. - // The old primary leader cannot affect the correctness since its proposal has a - // smaller term and will not be committed. - // - // TODO: raft follower do not forward lease management proposals. There might be a - // very small window (within second normally which depends on go scheduling) that - // a raft follow is the primary between the raft leader demotion and lessor demotion. - // Usually this should not be a problem. Lease should not be that sensitive to timing. - primary bool + // demotec is set when the lessor is the primary. + // demotec will be closed if the lessor is demoted. + demotec chan struct{} // TODO: probably this should be a heap with a secondary // id index. @@ -173,6 +162,23 @@ func newLessor(b backend.Backend, minLeaseTTL int64) *lessor { return l } +// isPrimary indicates if this lessor is the primary lessor. The primary +// lessor manages lease expiration and renew. +// +// in etcd, raft leader is the primary. Thus there might be two primary +// leaders at the same time (raft allows concurrent leader but with different term) +// for at most a leader election timeout. +// The old primary leader cannot affect the correctness since its proposal has a +// smaller term and will not be committed. +// +// TODO: raft follower do not forward lease management proposals. There might be a +// very small window (within second normally which depends on go scheduling) that +// a raft follow is the primary between the raft leader demotion and lessor demotion. +// Usually this should not be a problem. Lease should not be that sensitive to timing. +func (le *lessor) isPrimary() bool { + return le.demotec != nil +} + func (le *lessor) SetRangeDeleter(rd RangeDeleter) { le.mu.Lock() defer le.mu.Unlock() @@ -187,7 +193,12 @@ func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) { // TODO: when lessor is under high load, it should give out lease // with longer TTL to reduce renew load. - l := &Lease{ID: id, TTL: ttl, itemSet: make(map[LeaseItem]struct{})} + l := &Lease{ + ID: id, + TTL: ttl, + itemSet: make(map[LeaseItem]struct{}), + revokec: make(chan struct{}), + } le.mu.Lock() defer le.mu.Unlock() @@ -200,7 +211,7 @@ func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) { l.TTL = le.minLeaseTTL } - if le.primary { + if le.isPrimary() { l.refresh(0) } else { l.forever() @@ -220,6 +231,7 @@ func (le *lessor) Revoke(id LeaseID) error { le.mu.Unlock() return ErrLeaseNotFound } + defer close(l.revokec) // unlock before doing external work le.mu.Unlock() @@ -255,18 +267,40 @@ func (le *lessor) Revoke(id LeaseID) error { // has expired, an error will be returned. func (le *lessor) Renew(id LeaseID) (int64, error) { le.mu.Lock() - defer le.mu.Unlock() - if !le.primary { + unlock := func() { le.mu.Unlock() } + defer func() { unlock() }() + + if !le.isPrimary() { // forward renew request to primary instead of returning error. return -1, ErrNotPrimary } + demotec := le.demotec + l := le.leaseMap[id] if l == nil { return -1, ErrLeaseNotFound } + if l.expired() { + le.mu.Unlock() + unlock = func() {} + select { + // A expired lease might be pending for revoking or going through + // quorum to be revoked. To be accurate, renew request must wait for the + // deletion to complete. + case <-l.revokec: + return -1, ErrLeaseNotFound + // The expired lease might fail to be revoked if the primary changes. + // The caller will retry on ErrNotPrimary. + case <-demotec: + return -1, ErrNotPrimary + case <-le.stopC: + return -1, ErrNotPrimary + } + } + l.refresh(0) return l.TTL, nil } @@ -284,7 +318,7 @@ func (le *lessor) Promote(extend time.Duration) { le.mu.Lock() defer le.mu.Unlock() - le.primary = true + le.demotec = make(chan struct{}) // refresh the expiries of all leases. for _, l := range le.leaseMap { @@ -301,7 +335,10 @@ func (le *lessor) Demote() { l.forever() } - le.primary = false + if le.demotec != nil { + close(le.demotec) + le.demotec = nil + } } // Attach attaches items to the lease with given ID. When the lease @@ -366,7 +403,7 @@ func (le *lessor) runLoop() { var ls []*Lease le.mu.Lock() - if le.primary { + if le.isPrimary() { ls = le.findExpiredLeases() } le.mu.Unlock() @@ -395,12 +432,11 @@ func (le *lessor) runLoop() { // leases that needed to be revoked. func (le *lessor) findExpiredLeases() []*Lease { leases := make([]*Lease, 0, 16) - now := time.Now() for _, l := range le.leaseMap { // TODO: probably should change to <= 100-500 millisecond to // make up committing latency. - if l.expiry.Sub(now) <= 0 { + if l.expired() { leases = append(leases, l) } } @@ -442,6 +478,7 @@ func (le *lessor) initAndRecover() { // set expiry to forever, refresh when promoted itemSet: make(map[LeaseItem]struct{}), expiry: forever, + revokec: make(chan struct{}), } } tx.Unlock() @@ -455,7 +492,12 @@ type Lease struct { itemSet map[LeaseItem]struct{} // expiry time in unixnano - expiry time.Time + expiry time.Time + revokec chan struct{} +} + +func (l Lease) expired() bool { + return l.Remaining() <= 0 } func (l Lease) persistTo(b backend.Backend) { diff --git a/lease/lessor_test.go b/lease/lessor_test.go index adc682c81..2081a48f7 100644 --- a/lease/lessor_test.go +++ b/lease/lessor_test.go @@ -221,6 +221,108 @@ func TestLessorRecover(t *testing.T) { } } +func TestLessorExpire(t *testing.T) { + dir, be := NewTestBackend(t) + defer os.RemoveAll(dir) + defer be.Close() + + testMinTTL := int64(1) + + le := newLessor(be, testMinTTL) + defer le.Stop() + + le.Promote(1 * time.Second) + l, err := le.Grant(1, testMinTTL) + if err != nil { + t.Fatalf("failed to create lease: %v", err) + } + + select { + case el := <-le.ExpiredLeasesC(): + if el[0].ID != l.ID { + t.Fatalf("expired id = %x, want %x", el[0].ID, l.ID) + } + case <-time.After(10 * time.Second): + t.Fatalf("failed to receive expired lease") + } + + donec := make(chan struct{}) + go func() { + // expired lease cannot be renewed + if _, err := le.Renew(l.ID); err != ErrLeaseNotFound { + t.Fatalf("unexpected renew") + } + donec <- struct{}{} + }() + + select { + case <-donec: + t.Fatalf("renew finished before lease revocation") + case <-time.After(50 * time.Millisecond): + } + + // expired lease can be revoked + if err := le.Revoke(l.ID); err != nil { + t.Fatalf("failed to revoke expired lease: %v", err) + } + + select { + case <-donec: + case <-time.After(10 * time.Second): + t.Fatalf("renew has not returned after lease revocation") + } +} + +func TestLessorExpireAndDemote(t *testing.T) { + dir, be := NewTestBackend(t) + defer os.RemoveAll(dir) + defer be.Close() + + testMinTTL := int64(1) + + le := newLessor(be, testMinTTL) + defer le.Stop() + + le.Promote(1 * time.Second) + l, err := le.Grant(1, testMinTTL) + if err != nil { + t.Fatalf("failed to create lease: %v", err) + } + + select { + case el := <-le.ExpiredLeasesC(): + if el[0].ID != l.ID { + t.Fatalf("expired id = %x, want %x", el[0].ID, l.ID) + } + case <-time.After(10 * time.Second): + t.Fatalf("failed to receive expired lease") + } + + donec := make(chan struct{}) + go func() { + // expired lease cannot be renewed + if _, err := le.Renew(l.ID); err != ErrNotPrimary { + t.Fatalf("unexpected renew: %v", err) + } + donec <- struct{}{} + }() + + select { + case <-donec: + t.Fatalf("renew finished before demotion") + case <-time.After(50 * time.Millisecond): + } + + // demote will cause the renew request to fail with ErrNotPrimary + le.Demote() + + select { + case <-donec: + case <-time.After(10 * time.Second): + t.Fatalf("renew has not returned after lessor demotion") + } +} + type fakeDeleter struct { deleted []string } From 0f0c048e29ff63355d176d6a6217778a1d82561a Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 5 Oct 2016 14:24:03 -0700 Subject: [PATCH 2/2] etcdserver: fix early lessor promotion issue If we promote the lessor before finish applying all entries from the last term, we might incorrectly renew the already revoked leases. Here is an example: - Term 1: revoke lease A accepted by raft - Old leader failed, new election happened - Term 2: promote - Term 2: keep alive A succeed. A now has 10 seconds TTL - Term 2: revoke lease A from Term 1 got committed and applied - Term 2: the lease A with 10 seconds TTL is revoked To solve this, the new leader MUST apply all entries from old term before promote its lessor to start accept renew requests. --- etcdserver/raft.go | 6 +----- etcdserver/server.go | 5 +++++ 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/etcdserver/raft.go b/etcdserver/raft.go index 27170b602..754aea6b0 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -172,12 +172,8 @@ func (r *raftNode) start(s *EtcdServer) { atomic.StoreUint64(&r.lead, rd.SoftState.Lead) if rd.RaftState == raft.StateLeader { islead = true - // TODO: raft should send server a notification through chan when - // it promotes or demotes instead of modifying server directly. syncC = r.s.SyncTicker - if r.s.lessor != nil { - r.s.lessor.Promote(r.s.Cfg.electionTimeout()) - } + // TODO: remove the nil checking // current test utility does not provide the stats if r.s.stats != nil { diff --git a/etcdserver/server.go b/etcdserver/server.go index f5e58f51d..fa273a0ae 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -1210,6 +1210,11 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { case s.forceVersionC <- struct{}{}: default: } + // promote lessor when the local member is leader and finished + // applying all entries from the last term. + if s.isLeader() { + s.lessor.Promote(s.Cfg.electionTimeout()) + } return }