etcd-runner: add rate limiting in doRounds()

This commit is contained in:
fanmin shi 2017-04-25 10:22:11 -07:00
parent b94b8b5707
commit fa85445ef8

View File

@ -15,6 +15,7 @@
package command
import (
"context"
"fmt"
"log"
"sync"
@ -23,25 +24,18 @@ import (
"github.com/coreos/etcd/clientv3"
"github.com/spf13/cobra"
"golang.org/x/time/rate"
)
// shared flags
var (
rounds int // total number of rounds the operation needs to be performed
totalClientConnections int // total number of client connections to be made with server
noOfPrefixes int // total number of prefixes which will be watched upon
watchPerPrefix int // number of watchers per prefix
reqRate int // put request per second
totalKeys int // total number of keys for operation
runningTime time.Duration // time for which operation should be performed
totalClientConnections int // total number of client connections to be made with server
endpoints []string
dialTimeout time.Duration
rounds int // total number of rounds to run; set to <= 0 to run forever.
reqRate int // maximum number of requests per second.
)
// GlobalFlags are flags that defined globally
// and are inherited to all sub-commands.
type GlobalFlags struct {
Endpoints []string
DialTimeout time.Duration
}
type roundClient struct {
c *clientv3.Client
progress int
@ -61,16 +55,21 @@ func newClient(eps []string, timeout time.Duration) *clientv3.Client {
return c
}
func doRounds(rcs []roundClient, rounds int) {
func doRounds(rcs []roundClient, rounds int, requests int) {
var mu sync.Mutex
var wg sync.WaitGroup
wg.Add(len(rcs))
finished := make(chan struct{})
limiter := rate.NewLimiter(rate.Limit(reqRate), reqRate)
for i := range rcs {
go func(rc *roundClient) {
defer wg.Done()
for rc.progress < rounds {
for rc.progress < rounds || rounds <= 0 {
if err := limiter.WaitN(context.Background(), requests/len(rcs)); err != nil {
log.Panicf("rate limiter error %v", err)
}
for rc.acquire() != nil { /* spin */
}
@ -85,7 +84,7 @@ func doRounds(rcs []roundClient, rounds int) {
finished <- struct{}{}
mu.Lock()
for rc.release() != nil {
for rc.release() != nil { /* spin */
mu.Unlock()
mu.Lock()
}
@ -95,7 +94,7 @@ func doRounds(rcs []roundClient, rounds int) {
}
start := time.Now()
for i := 1; i < len(rcs)*rounds+1; i++ {
for i := 1; i < len(rcs)*rounds+1 || rounds <= 0; i++ {
select {
case <-finished:
if i%100 == 0 {