From fa85445ef8fb71cf2bffac898e42c172f3fcbe3e Mon Sep 17 00:00:00 2001 From: fanmin shi Date: Tue, 25 Apr 2017 10:22:11 -0700 Subject: [PATCH] etcd-runner: add rate limiting in doRounds() --- .../etcd-runner/command/global.go | 35 +++++++++---------- 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/tools/functional-tester/etcd-runner/command/global.go b/tools/functional-tester/etcd-runner/command/global.go index 38d6de38c..1d26f2047 100644 --- a/tools/functional-tester/etcd-runner/command/global.go +++ b/tools/functional-tester/etcd-runner/command/global.go @@ -15,6 +15,7 @@ package command import ( + "context" "fmt" "log" "sync" @@ -23,25 +24,18 @@ import ( "github.com/coreos/etcd/clientv3" "github.com/spf13/cobra" + "golang.org/x/time/rate" ) +// shared flags var ( - rounds int // total number of rounds the operation needs to be performed - totalClientConnections int // total number of client connections to be made with server - noOfPrefixes int // total number of prefixes which will be watched upon - watchPerPrefix int // number of watchers per prefix - reqRate int // put request per second - totalKeys int // total number of keys for operation - runningTime time.Duration // time for which operation should be performed + totalClientConnections int // total number of client connections to be made with server + endpoints []string + dialTimeout time.Duration + rounds int // total number of rounds to run; set to <= 0 to run forever. + reqRate int // maximum number of requests per second. ) -// GlobalFlags are flags that defined globally -// and are inherited to all sub-commands. -type GlobalFlags struct { - Endpoints []string - DialTimeout time.Duration -} - type roundClient struct { c *clientv3.Client progress int @@ -61,16 +55,21 @@ func newClient(eps []string, timeout time.Duration) *clientv3.Client { return c } -func doRounds(rcs []roundClient, rounds int) { +func doRounds(rcs []roundClient, rounds int, requests int) { var mu sync.Mutex var wg sync.WaitGroup wg.Add(len(rcs)) finished := make(chan struct{}) + limiter := rate.NewLimiter(rate.Limit(reqRate), reqRate) for i := range rcs { go func(rc *roundClient) { defer wg.Done() - for rc.progress < rounds { + for rc.progress < rounds || rounds <= 0 { + if err := limiter.WaitN(context.Background(), requests/len(rcs)); err != nil { + log.Panicf("rate limiter error %v", err) + } + for rc.acquire() != nil { /* spin */ } @@ -85,7 +84,7 @@ func doRounds(rcs []roundClient, rounds int) { finished <- struct{}{} mu.Lock() - for rc.release() != nil { + for rc.release() != nil { /* spin */ mu.Unlock() mu.Lock() } @@ -95,7 +94,7 @@ func doRounds(rcs []roundClient, rounds int) { } start := time.Now() - for i := 1; i < len(rcs)*rounds+1; i++ { + for i := 1; i < len(rcs)*rounds+1 || rounds <= 0; i++ { select { case <-finished: if i%100 == 0 {