diff --git a/etcdserver/raft.go b/etcdserver/raft.go index 27170b602..754aea6b0 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -172,12 +172,8 @@ func (r *raftNode) start(s *EtcdServer) { atomic.StoreUint64(&r.lead, rd.SoftState.Lead) if rd.RaftState == raft.StateLeader { islead = true - // TODO: raft should send server a notification through chan when - // it promotes or demotes instead of modifying server directly. syncC = r.s.SyncTicker - if r.s.lessor != nil { - r.s.lessor.Promote(r.s.Cfg.electionTimeout()) - } + // TODO: remove the nil checking // current test utility does not provide the stats if r.s.stats != nil { diff --git a/etcdserver/server.go b/etcdserver/server.go index f5e58f51d..fa273a0ae 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -1210,6 +1210,11 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { case s.forceVersionC <- struct{}{}: default: } + // promote lessor when the local member is leader and finished + // applying all entries from the last term. + if s.isLeader() { + s.lessor.Promote(s.Cfg.electionTimeout()) + } return } diff --git a/lease/lessor.go b/lease/lessor.go index 293126532..52c2c67d2 100644 --- a/lease/lessor.go +++ b/lease/lessor.go @@ -111,20 +111,9 @@ type Lessor interface { type lessor struct { mu sync.Mutex - // primary indicates if this lessor is the primary lessor. The primary - // lessor manages lease expiration and renew. - // - // in etcd, raft leader is the primary. Thus there might be two primary - // leaders at the same time (raft allows concurrent leader but with different term) - // for at most a leader election timeout. - // The old primary leader cannot affect the correctness since its proposal has a - // smaller term and will not be committed. - // - // TODO: raft follower do not forward lease management proposals. There might be a - // very small window (within second normally which depends on go scheduling) that - // a raft follow is the primary between the raft leader demotion and lessor demotion. - // Usually this should not be a problem. Lease should not be that sensitive to timing. - primary bool + // demotec is set when the lessor is the primary. + // demotec will be closed if the lessor is demoted. + demotec chan struct{} // TODO: probably this should be a heap with a secondary // id index. @@ -174,6 +163,23 @@ func newLessor(b backend.Backend, minLeaseTTL int64) *lessor { return l } +// isPrimary indicates if this lessor is the primary lessor. The primary +// lessor manages lease expiration and renew. +// +// in etcd, raft leader is the primary. Thus there might be two primary +// leaders at the same time (raft allows concurrent leader but with different term) +// for at most a leader election timeout. +// The old primary leader cannot affect the correctness since its proposal has a +// smaller term and will not be committed. +// +// TODO: raft follower do not forward lease management proposals. There might be a +// very small window (within second normally which depends on go scheduling) that +// a raft follow is the primary between the raft leader demotion and lessor demotion. +// Usually this should not be a problem. Lease should not be that sensitive to timing. +func (le *lessor) isPrimary() bool { + return le.demotec != nil +} + func (le *lessor) SetRangeDeleter(rd RangeDeleter) { le.mu.Lock() defer le.mu.Unlock() @@ -188,7 +194,12 @@ func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) { // TODO: when lessor is under high load, it should give out lease // with longer TTL to reduce renew load. - l := &Lease{ID: id, TTL: ttl, itemSet: make(map[LeaseItem]struct{})} + l := &Lease{ + ID: id, + TTL: ttl, + itemSet: make(map[LeaseItem]struct{}), + revokec: make(chan struct{}), + } le.mu.Lock() defer le.mu.Unlock() @@ -201,7 +212,7 @@ func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) { l.TTL = le.minLeaseTTL } - if le.primary { + if le.isPrimary() { l.refresh(0) } else { l.forever() @@ -221,6 +232,7 @@ func (le *lessor) Revoke(id LeaseID) error { le.mu.Unlock() return ErrLeaseNotFound } + defer close(l.revokec) // unlock before doing external work le.mu.Unlock() @@ -264,18 +276,40 @@ func (le *lessor) Revoke(id LeaseID) error { // has expired, an error will be returned. func (le *lessor) Renew(id LeaseID) (int64, error) { le.mu.Lock() - defer le.mu.Unlock() - if !le.primary { + unlock := func() { le.mu.Unlock() } + defer func() { unlock() }() + + if !le.isPrimary() { // forward renew request to primary instead of returning error. return -1, ErrNotPrimary } + demotec := le.demotec + l := le.leaseMap[id] if l == nil { return -1, ErrLeaseNotFound } + if l.expired() { + le.mu.Unlock() + unlock = func() {} + select { + // A expired lease might be pending for revoking or going through + // quorum to be revoked. To be accurate, renew request must wait for the + // deletion to complete. + case <-l.revokec: + return -1, ErrLeaseNotFound + // The expired lease might fail to be revoked if the primary changes. + // The caller will retry on ErrNotPrimary. + case <-demotec: + return -1, ErrNotPrimary + case <-le.stopC: + return -1, ErrNotPrimary + } + } + l.refresh(0) return l.TTL, nil } @@ -290,7 +324,7 @@ func (le *lessor) Promote(extend time.Duration) { le.mu.Lock() defer le.mu.Unlock() - le.primary = true + le.demotec = make(chan struct{}) // refresh the expiries of all leases. for _, l := range le.leaseMap { @@ -307,7 +341,10 @@ func (le *lessor) Demote() { l.forever() } - le.primary = false + if le.demotec != nil { + close(le.demotec) + le.demotec = nil + } } // Attach attaches items to the lease with given ID. When the lease @@ -372,7 +409,7 @@ func (le *lessor) runLoop() { var ls []*Lease le.mu.Lock() - if le.primary { + if le.isPrimary() { ls = le.findExpiredLeases() } le.mu.Unlock() @@ -401,12 +438,11 @@ func (le *lessor) runLoop() { // leases that needed to be revoked. func (le *lessor) findExpiredLeases() []*Lease { leases := make([]*Lease, 0, 16) - now := time.Now() for _, l := range le.leaseMap { // TODO: probably should change to <= 100-500 millisecond to // make up committing latency. - if l.expiry.Sub(now) <= 0 { + if l.expired() { leases = append(leases, l) } } @@ -439,6 +475,7 @@ func (le *lessor) initAndRecover() { // set expiry to forever, refresh when promoted itemSet: make(map[LeaseItem]struct{}), expiry: forever, + revokec: make(chan struct{}), } } tx.Unlock() @@ -452,7 +489,12 @@ type Lease struct { itemSet map[LeaseItem]struct{} // expiry time in unixnano - expiry time.Time + expiry time.Time + revokec chan struct{} +} + +func (l Lease) expired() bool { + return l.Remaining() <= 0 } func (l Lease) persistTo(b backend.Backend) { diff --git a/lease/lessor_test.go b/lease/lessor_test.go index a3a15a958..3f15a0d34 100644 --- a/lease/lessor_test.go +++ b/lease/lessor_test.go @@ -221,6 +221,108 @@ func TestLessorRecover(t *testing.T) { } } +func TestLessorExpire(t *testing.T) { + dir, be := NewTestBackend(t) + defer os.RemoveAll(dir) + defer be.Close() + + testMinTTL := int64(1) + + le := newLessor(be, testMinTTL) + defer le.Stop() + + le.Promote(1 * time.Second) + l, err := le.Grant(1, testMinTTL) + if err != nil { + t.Fatalf("failed to create lease: %v", err) + } + + select { + case el := <-le.ExpiredLeasesC(): + if el[0].ID != l.ID { + t.Fatalf("expired id = %x, want %x", el[0].ID, l.ID) + } + case <-time.After(10 * time.Second): + t.Fatalf("failed to receive expired lease") + } + + donec := make(chan struct{}) + go func() { + // expired lease cannot be renewed + if _, err := le.Renew(l.ID); err != ErrLeaseNotFound { + t.Fatalf("unexpected renew") + } + donec <- struct{}{} + }() + + select { + case <-donec: + t.Fatalf("renew finished before lease revocation") + case <-time.After(50 * time.Millisecond): + } + + // expired lease can be revoked + if err := le.Revoke(l.ID); err != nil { + t.Fatalf("failed to revoke expired lease: %v", err) + } + + select { + case <-donec: + case <-time.After(10 * time.Second): + t.Fatalf("renew has not returned after lease revocation") + } +} + +func TestLessorExpireAndDemote(t *testing.T) { + dir, be := NewTestBackend(t) + defer os.RemoveAll(dir) + defer be.Close() + + testMinTTL := int64(1) + + le := newLessor(be, testMinTTL) + defer le.Stop() + + le.Promote(1 * time.Second) + l, err := le.Grant(1, testMinTTL) + if err != nil { + t.Fatalf("failed to create lease: %v", err) + } + + select { + case el := <-le.ExpiredLeasesC(): + if el[0].ID != l.ID { + t.Fatalf("expired id = %x, want %x", el[0].ID, l.ID) + } + case <-time.After(10 * time.Second): + t.Fatalf("failed to receive expired lease") + } + + donec := make(chan struct{}) + go func() { + // expired lease cannot be renewed + if _, err := le.Renew(l.ID); err != ErrNotPrimary { + t.Fatalf("unexpected renew: %v", err) + } + donec <- struct{}{} + }() + + select { + case <-donec: + t.Fatalf("renew finished before demotion") + case <-time.After(50 * time.Millisecond): + } + + // demote will cause the renew request to fail with ErrNotPrimary + le.Demote() + + select { + case <-donec: + case <-time.After(10 * time.Second): + t.Fatalf("renew has not returned after lessor demotion") + } +} + type fakeDeleter struct { deleted []string }