mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #10492 from nolouch/fix-lessor
lease: fix deadlock with Renew lease when the checkpointor is set
This commit is contained in:
commit
4b69cfc56b
@ -349,13 +349,10 @@ func (le *lessor) Checkpoint(id LeaseID, remainingTTL int64) error {
|
|||||||
// Renew renews an existing lease. If the given lease does not exist or
|
// Renew renews an existing lease. If the given lease does not exist or
|
||||||
// has expired, an error will be returned.
|
// has expired, an error will be returned.
|
||||||
func (le *lessor) Renew(id LeaseID) (int64, error) {
|
func (le *lessor) Renew(id LeaseID) (int64, error) {
|
||||||
le.mu.Lock()
|
le.mu.RLock()
|
||||||
|
|
||||||
unlock := func() { le.mu.Unlock() }
|
|
||||||
defer func() { unlock() }()
|
|
||||||
|
|
||||||
if !le.isPrimary() {
|
if !le.isPrimary() {
|
||||||
// forward renew request to primary instead of returning error.
|
// forward renew request to primary instead of returning error.
|
||||||
|
le.mu.RUnlock()
|
||||||
return -1, ErrNotPrimary
|
return -1, ErrNotPrimary
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -363,12 +360,14 @@ func (le *lessor) Renew(id LeaseID) (int64, error) {
|
|||||||
|
|
||||||
l := le.leaseMap[id]
|
l := le.leaseMap[id]
|
||||||
if l == nil {
|
if l == nil {
|
||||||
|
le.mu.RUnlock()
|
||||||
return -1, ErrLeaseNotFound
|
return -1, ErrLeaseNotFound
|
||||||
}
|
}
|
||||||
|
// Clear remaining TTL when we renew if it is set
|
||||||
|
clearRemainingTTL := le.cp != nil && l.remainingTTL > 0
|
||||||
|
|
||||||
|
le.mu.RUnlock()
|
||||||
if l.expired() {
|
if l.expired() {
|
||||||
le.mu.Unlock()
|
|
||||||
unlock = func() {}
|
|
||||||
select {
|
select {
|
||||||
// A expired lease might be pending for revoking or going through
|
// A expired lease might be pending for revoking or going through
|
||||||
// quorum to be revoked. To be accurate, renew request must wait for the
|
// quorum to be revoked. To be accurate, renew request must wait for the
|
||||||
@ -387,13 +386,15 @@ func (le *lessor) Renew(id LeaseID) (int64, error) {
|
|||||||
// Clear remaining TTL when we renew if it is set
|
// Clear remaining TTL when we renew if it is set
|
||||||
// By applying a RAFT entry only when the remainingTTL is already set, we limit the number
|
// By applying a RAFT entry only when the remainingTTL is already set, we limit the number
|
||||||
// of RAFT entries written per lease to a max of 2 per checkpoint interval.
|
// of RAFT entries written per lease to a max of 2 per checkpoint interval.
|
||||||
if le.cp != nil && l.remainingTTL > 0 {
|
if clearRemainingTTL {
|
||||||
le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: []*pb.LeaseCheckpoint{{ID: int64(l.ID), Remaining_TTL: 0}}})
|
le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: []*pb.LeaseCheckpoint{{ID: int64(l.ID), Remaining_TTL: 0}}})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
le.mu.Lock()
|
||||||
l.refresh(0)
|
l.refresh(0)
|
||||||
item := &LeaseWithTime{id: l.ID, time: l.expiry.UnixNano()}
|
item := &LeaseWithTime{id: l.ID, time: l.expiry.UnixNano()}
|
||||||
heap.Push(&le.leaseHeap, item)
|
heap.Push(&le.leaseHeap, item)
|
||||||
|
le.mu.Unlock()
|
||||||
|
|
||||||
leaseRenewed.Inc()
|
leaseRenewed.Inc()
|
||||||
return l.ttl, nil
|
return l.ttl, nil
|
||||||
|
@ -236,6 +236,50 @@ func TestLessorRenew(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestLessorRenewWithCheckpointer(t *testing.T) {
|
||||||
|
lg := zap.NewNop()
|
||||||
|
dir, be := NewTestBackend(t)
|
||||||
|
defer be.Close()
|
||||||
|
defer os.RemoveAll(dir)
|
||||||
|
|
||||||
|
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
|
||||||
|
fakerCheckerpointer := func(ctx context.Context, cp *pb.LeaseCheckpointRequest) {
|
||||||
|
for _, cp := range cp.GetCheckpoints() {
|
||||||
|
le.Checkpoint(LeaseID(cp.GetID()), cp.GetRemaining_TTL())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
defer le.Stop()
|
||||||
|
// Set checkpointer
|
||||||
|
le.SetCheckpointer(fakerCheckerpointer)
|
||||||
|
le.Promote(0)
|
||||||
|
|
||||||
|
l, err := le.Grant(1, minLeaseTTL)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to grant lease (%v)", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// manually change the ttl field
|
||||||
|
le.mu.Lock()
|
||||||
|
l.ttl = 10
|
||||||
|
l.remainingTTL = 10
|
||||||
|
le.mu.Unlock()
|
||||||
|
ttl, err := le.Renew(l.ID)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to renew lease (%v)", err)
|
||||||
|
}
|
||||||
|
if ttl != l.ttl {
|
||||||
|
t.Errorf("ttl = %d, want %d", ttl, l.ttl)
|
||||||
|
}
|
||||||
|
if l.remainingTTL != 0 {
|
||||||
|
t.Fatalf("remianingTTL = %d, want %d", l.remainingTTL, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
l = le.Lookup(l.ID)
|
||||||
|
if l.Remaining() < 9*time.Second {
|
||||||
|
t.Errorf("failed to renew the lease")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// TestLessorRenewExtendPileup ensures Lessor extends leases on promotion if too many
|
// TestLessorRenewExtendPileup ensures Lessor extends leases on promotion if too many
|
||||||
// expire at the same time.
|
// expire at the same time.
|
||||||
func TestLessorRenewExtendPileup(t *testing.T) {
|
func TestLessorRenewExtendPileup(t *testing.T) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user