From d582fdcc1bc2e540a9af00f27bae66025e295b28 Mon Sep 17 00:00:00 2001 From: fanmin shi Date: Thu, 20 Oct 2016 15:39:23 -0700 Subject: [PATCH] functional-tester: add rate limiter to lease stresser too many leases created can cause compaction to timeout. adding a rate limiter limits number of leases and attched keys. --- .../etcd-tester/lease_stresser.go | 37 +++++++++++++------ 1 file changed, 25 insertions(+), 12 deletions(-) diff --git a/tools/functional-tester/etcd-tester/lease_stresser.go b/tools/functional-tester/etcd-tester/lease_stresser.go index 03b37240a..0af31bfb5 100644 --- a/tools/functional-tester/etcd-tester/lease_stresser.go +++ b/tools/functional-tester/etcd-tester/lease_stresser.go @@ -24,11 +24,16 @@ import ( "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "golang.org/x/net/context" + "golang.org/x/time/rate" "google.golang.org/grpc" ) -// time to live for lease -const TTL = 30 +const ( + // time to live for lease + TTL = 30 + // leasesStressRoundPs indicates the rate that leaseStresser.run() creates and deletes leases per second + leasesStressRoundPs = 1 +) type leaseStressConfig struct { numLeases int @@ -44,6 +49,8 @@ type leaseStresser struct { lc pb.LeaseClient ctx context.Context + rateLimiter *rate.Limiter + success int failure int numLeases int @@ -115,10 +122,13 @@ func newLeaseStresserBuilder(s string, lsConfig *leaseStressConfig) leaseStresse } case "default": return func(mem *member) Stresser { + // limit lease stresser to run 1 round per second + l := rate.NewLimiter(rate.Limit(leasesStressRoundPs), leasesStressRoundPs) return &leaseStresser{ endpoint: mem.grpcAddr(), numLeases: lsConfig.numLeases, keysPerLease: lsConfig.keysPerLease, + rateLimiter: l, } } default: @@ -170,13 +180,16 @@ func (ls *leaseStresser) Stress() error { func (ls *leaseStresser) run() { defer ls.runWg.Done() ls.restartKeepAlives() - for ls.ctx.Err() == nil { - plog.Debugf("creating lease on %v ", ls.endpoint) + for { + if err := ls.rateLimiter.Wait(ls.ctx); err == context.Canceled { + return + } + plog.Debugf("creating lease on %v", ls.endpoint) ls.createLeases() - plog.Debugf("done creating lease on %v ", ls.endpoint) - plog.Debugf("dropping lease on %v ", ls.endpoint) + plog.Debugf("done creating lease on %v", ls.endpoint) + plog.Debugf("dropping lease on %v", ls.endpoint) ls.randomlyDropLeases() - plog.Debugf("done dropping lease on %v ", ls.endpoint) + plog.Debugf("done dropping lease on %v", ls.endpoint) } } @@ -201,7 +214,7 @@ func (ls *leaseStresser) createLeases() { plog.Errorf("lease creation error: (%v)", err) return } - plog.Debugf("lease %v created ", leaseID) + plog.Debugf("lease %v created", leaseID) // if attaching keys to the lease encountered an error, we don't add the lease to the aliveLeases map // because invariant check on the lease will fail due to keys not found if err := ls.attachKeysWithLease(leaseID); err != nil { @@ -232,7 +245,7 @@ func (ls *leaseStresser) randomlyDropLeases() { if !dropped { return } - plog.Debugf("lease %v dropped ", leaseID) + plog.Debugf("lease %v dropped", leaseID) ls.revokedLeases.add(leaseID, time.Now()) ls.aliveLeases.remove(leaseID) }(l) @@ -311,20 +324,20 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) { err = stream.Send(&pb.LeaseKeepAliveRequest{ID: leaseID}) plog.Debugf("keepLeaseAlive stream sends lease %v keepalive request", leaseID) if err != nil { - plog.Debugf("keepLeaseAlive stream sends lease %v error (%v) ", leaseID, err) + plog.Debugf("keepLeaseAlive stream sends lease %v error (%v)", leaseID, err) continue } leaseRenewTime := time.Now() plog.Debugf("keepLeaseAlive stream sends lease %v keepalive request succeed", leaseID) respRC, err := stream.Recv() if err != nil { - plog.Debugf("keepLeaseAlive stream receives lease %v stream error (%v) ", leaseID, err) + plog.Debugf("keepLeaseAlive stream receives lease %v stream error (%v)", leaseID, err) continue } // lease expires after TTL become 0 // don't send keepalive if the lease has expired if respRC.TTL <= 0 { - plog.Debugf("keepLeaseAlive stream receives lease %v has TTL <= 0 ", leaseID) + plog.Debugf("keepLeaseAlive stream receives lease %v has TTL <= 0", leaseID) ls.aliveLeases.remove(leaseID) return }