From 25deb436af9cb2de1682373b5b4d2a0958087de2 Mon Sep 17 00:00:00 2001 From: ahrtr Date: Tue, 24 May 2022 10:37:35 +0800 Subject: [PATCH] fix the race condition between goroutine and channel on the same leases to be revoked --- server/etcdserver/server.go | 63 +++++++++++++++++++++---------------- 1 file changed, 36 insertions(+), 27 deletions(-) diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index a64939e34..ce5a65fda 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -842,33 +842,7 @@ func (s *EtcdServer) run() { f := func(context.Context) { s.applyAll(&ep, &ap) } sched.Schedule(f) case leases := <-expiredLeaseC: - s.GoAttach(func() { - // Increases throughput of expired leases deletion process through parallelization - c := make(chan struct{}, maxPendingRevokes) - for _, lease := range leases { - select { - case c <- struct{}{}: - case <-s.stopping: - return - } - lid := lease.ID - s.GoAttach(func() { - ctx := s.authStore.WithRoot(s.ctx) - _, lerr := s.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: int64(lid)}) - if lerr == nil { - leaseExpired.Inc() - } else { - lg.Warn( - "failed to revoke lease", - zap.String("lease-id", fmt.Sprintf("%016x", lid)), - zap.Error(lerr), - ) - } - - <-c - }) - } - }) + s.revokeExpiredLeases(leases) case err := <-s.errorc: lg.Warn("server error", zap.Error(err)) lg.Warn("data-dir used by this member must be removed") @@ -883,6 +857,41 @@ func (s *EtcdServer) run() { } } +func (s *EtcdServer) revokeExpiredLeases(leases []*lease.Lease) { + s.GoAttach(func() { + lg := s.Logger() + // Increases throughput of expired leases deletion process through parallelization + c := make(chan struct{}, maxPendingRevokes) + for _, curLease := range leases { + select { + case c <- struct{}{}: + case <-s.stopping: + return + } + + f := func(lid int64) { + s.GoAttach(func() { + ctx := s.authStore.WithRoot(s.ctx) + _, lerr := s.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: lid}) + if lerr == nil { + leaseExpired.Inc() + } else { + lg.Warn( + "failed to revoke lease", + zap.String("lease-id", fmt.Sprintf("%016x", lid)), + zap.Error(lerr), + ) + } + + <-c + }) + } + + f(int64(curLease.ID)) + } + }) +} + // Cleanup removes allocated objects by EtcdServer.NewServer in // situation that EtcdServer::Start was not called (that takes care of cleanup). func (s *EtcdServer) Cleanup() {