mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #14087 from ahrtr/lease_revoke_race
[3.5] Backport two lease related bug fixes to 3.5
This commit is contained in:
commit
0be65da6cc
@ -1119,33 +1119,7 @@ func (s *EtcdServer) run() {
|
||||
f := func(context.Context) { s.applyAll(&ep, &ap) }
|
||||
sched.Schedule(f)
|
||||
case leases := <-expiredLeaseC:
|
||||
s.GoAttach(func() {
|
||||
// Increases throughput of expired leases deletion process through parallelization
|
||||
c := make(chan struct{}, maxPendingRevokes)
|
||||
for _, lease := range leases {
|
||||
select {
|
||||
case c <- struct{}{}:
|
||||
case <-s.stopping:
|
||||
return
|
||||
}
|
||||
lid := lease.ID
|
||||
s.GoAttach(func() {
|
||||
ctx := s.authStore.WithRoot(s.ctx)
|
||||
_, lerr := s.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: int64(lid)})
|
||||
if lerr == nil {
|
||||
leaseExpired.Inc()
|
||||
} else {
|
||||
lg.Warn(
|
||||
"failed to revoke lease",
|
||||
zap.String("lease-id", fmt.Sprintf("%016x", lid)),
|
||||
zap.Error(lerr),
|
||||
)
|
||||
}
|
||||
|
||||
<-c
|
||||
})
|
||||
}
|
||||
})
|
||||
s.revokeExpiredLeases(leases)
|
||||
case err := <-s.errorc:
|
||||
lg.Warn("server error", zap.Error(err))
|
||||
lg.Warn("data-dir used by this member must be removed")
|
||||
@ -1160,6 +1134,41 @@ func (s *EtcdServer) run() {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *EtcdServer) revokeExpiredLeases(leases []*lease.Lease) {
|
||||
s.GoAttach(func() {
|
||||
lg := s.Logger()
|
||||
// Increases throughput of expired leases deletion process through parallelization
|
||||
c := make(chan struct{}, maxPendingRevokes)
|
||||
for _, curLease := range leases {
|
||||
select {
|
||||
case c <- struct{}{}:
|
||||
case <-s.stopping:
|
||||
return
|
||||
}
|
||||
|
||||
f := func(lid int64) {
|
||||
s.GoAttach(func() {
|
||||
ctx := s.authStore.WithRoot(s.ctx)
|
||||
_, lerr := s.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: lid})
|
||||
if lerr == nil {
|
||||
leaseExpired.Inc()
|
||||
} else {
|
||||
lg.Warn(
|
||||
"failed to revoke lease",
|
||||
zap.String("lease-id", fmt.Sprintf("%016x", lid)),
|
||||
zap.Error(lerr),
|
||||
)
|
||||
}
|
||||
|
||||
<-c
|
||||
})
|
||||
}
|
||||
|
||||
f(int64(curLease.ID))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// Cleanup removes allocated objects by EtcdServer.NewServer in
|
||||
// situation that EtcdServer::Start was not called (that takes care of cleanup).
|
||||
func (s *EtcdServer) Cleanup() {
|
||||
|
@ -654,10 +654,9 @@ func (le *lessor) revokeExpiredLeases() {
|
||||
// checkpointScheduledLeases finds all scheduled lease checkpoints that are due and
|
||||
// submits them to the checkpointer to persist them to the consensus log.
|
||||
func (le *lessor) checkpointScheduledLeases() {
|
||||
var cps []*pb.LeaseCheckpoint
|
||||
|
||||
// rate limit
|
||||
for i := 0; i < leaseCheckpointRate/2; i++ {
|
||||
var cps []*pb.LeaseCheckpoint
|
||||
le.mu.Lock()
|
||||
if le.isPrimary() {
|
||||
cps = le.findDueScheduledCheckpoints(maxLeaseCheckpointBatchSize)
|
||||
|
Loading…
x
Reference in New Issue
Block a user