From 61713345951d06eecdf9e27b7790ad50615a5cea Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Thu, 8 Jun 2017 15:12:50 -0700 Subject: [PATCH] benchmark: refactor watch benchmark --- tools/benchmark/cmd/watch.go | 230 ++++++++++++++++++++--------------- 1 file changed, 133 insertions(+), 97 deletions(-) diff --git a/tools/benchmark/cmd/watch.go b/tools/benchmark/cmd/watch.go index b73e4f20e..5b2f57fc9 100644 --- a/tools/benchmark/cmd/watch.go +++ b/tools/benchmark/cmd/watch.go @@ -15,6 +15,7 @@ package cmd import ( + "context" "encoding/binary" "fmt" "math/rand" @@ -22,11 +23,11 @@ import ( "sync/atomic" "time" - v3 "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/pkg/report" "github.com/spf13/cobra" - "golang.org/x/net/context" + "golang.org/x/time/rate" "gopkg.in/cheggaaa/pb.v1" ) @@ -50,9 +51,9 @@ Each key is watched by (--total/--watched-key-total) watchers. } var ( - watchTotalStreams int - watchTotal int - watchedKeyTotal int + watchStreams int + watchWatchesPerStream int + watchedKeyTotal int watchPutRate int watchPutTotal int @@ -60,23 +61,27 @@ var ( watchKeySize int watchKeySpaceSize int watchSeqKeys bool - - eventsTotal int - - nrWatchCompleted int32 - nrRecvCompleted int32 - watchCompletedNotifier chan struct{} - recvCompletedNotifier chan struct{} ) +type watchedKeys struct { + watched []string + numWatchers map[string]int + + watches []clientv3.WatchChan + + // ctx to control all watches + ctx context.Context + cancel context.CancelFunc +} + func init() { RootCmd.AddCommand(watchCmd) - watchCmd.Flags().IntVar(&watchTotalStreams, "watchers", 10000, "Total number of watchers") - watchCmd.Flags().IntVar(&watchTotal, "total", 100000, "Total number of watch requests") - watchCmd.Flags().IntVar(&watchedKeyTotal, "watched-key-total", 10000, "Total number of keys to be watched") + watchCmd.Flags().IntVar(&watchStreams, "streams", 10, "Total watch streams") + watchCmd.Flags().IntVar(&watchWatchesPerStream, "watch-per-stream", 100, "Total watchers per stream") + watchCmd.Flags().IntVar(&watchedKeyTotal, "watched-key-total", 1, "Total number of keys to be watched") - watchCmd.Flags().IntVar(&watchPutRate, "put-rate", 100, "Number of keys to put per second") - watchCmd.Flags().IntVar(&watchPutTotal, "put-total", 10000, "Number of put requests") + watchCmd.Flags().IntVar(&watchPutRate, "put-rate", 0, "Number of keys to put per second") + watchCmd.Flags().IntVar(&watchPutTotal, "put-total", 1000, "Number of put requests") watchCmd.Flags().IntVar(&watchKeySize, "key-size", 32, "Key size of watch request") watchCmd.Flags().IntVar(&watchKeySpaceSize, "key-space-size", 1, "Maximum possible keys") @@ -88,9 +93,76 @@ func watchFunc(cmd *cobra.Command, args []string) { fmt.Fprintf(os.Stderr, "expected positive --key-space-size, got (%v)", watchKeySpaceSize) os.Exit(1) } + grpcConns := int(totalClients) + if totalClients > totalConns { + grpcConns = int(totalConns) + } + wantedConns := 1 + (watchStreams / 100) + if grpcConns < wantedConns { + fmt.Fprintf(os.Stderr, "warning: grpc limits 100 streams per client connection, have %d but need %d\n", grpcConns, wantedConns) + } + clients := mustCreateClients(totalClients, totalConns) + wk := newWatchedKeys() + benchMakeWatches(clients, wk) + benchPutWatches(clients, wk) +} +func benchMakeWatches(clients []*clientv3.Client, wk *watchedKeys) { + streams := make([]clientv3.Watcher, watchStreams) + for i := range streams { + streams[i] = clientv3.NewWatcher(clients[i%len(clients)]) + } + + keyc := make(chan string, watchStreams) + bar = pb.New(watchStreams * watchWatchesPerStream) + bar.Format("Bom !") + bar.Start() + + r := newReport() + rch := r.Results() + + wg.Add(len(streams) + 1) + wc := make(chan []clientv3.WatchChan, len(streams)) + for _, s := range streams { + go func(s clientv3.Watcher) { + defer wg.Done() + var ws []clientv3.WatchChan + for i := 0; i < watchWatchesPerStream; i++ { + k := <-keyc + st := time.Now() + wch := s.Watch(wk.ctx, k) + rch <- report.Result{Start: st, End: time.Now()} + ws = append(ws, wch) + bar.Increment() + } + wc <- ws + }(s) + } + go func() { + defer func() { + close(keyc) + wg.Done() + }() + for i := 0; i < watchStreams*watchWatchesPerStream; i++ { + key := wk.watched[i%len(wk.watched)] + keyc <- key + wk.numWatchers[key]++ + } + }() + + rc := r.Run() + wg.Wait() + bar.Finish() + close(r.Results()) + fmt.Printf("Watch creation summary:\n%s", <-rc) + + for i := 0; i < len(streams); i++ { + wk.watches = append(wk.watches, (<-wc)...) + } +} + +func newWatchedKeys() *watchedKeys { watched := make([]string, watchedKeyTotal) - numWatchers := make(map[string]int) for i := range watched { k := make([]byte, watchKeySize) if watchSeqKeys { @@ -100,112 +172,76 @@ func watchFunc(cmd *cobra.Command, args []string) { } watched[i] = string(k) } - - requests := make(chan string, totalClients) - - clients := mustCreateClients(totalClients, totalConns) - - streams := make([]v3.Watcher, watchTotalStreams) - for i := range streams { - streams[i] = v3.NewWatcher(clients[i%len(clients)]) + ctx, cancel := context.WithCancel(context.TODO()) + return &watchedKeys{ + watched: watched, + numWatchers: make(map[string]int), + ctx: ctx, + cancel: cancel, } +} - // watching phase - bar = pb.New(watchTotal) - bar.Format("Bom !") - bar.Start() - - atomic.StoreInt32(&nrWatchCompleted, int32(0)) - watchCompletedNotifier = make(chan struct{}) - - r := report.NewReportRate("%4.4f") - for i := range streams { - go doWatch(streams[i], requests, r.Results()) - } - - go func() { - for i := 0; i < watchTotal; i++ { - key := watched[i%len(watched)] - requests <- key - numWatchers[key]++ - } - close(requests) - }() - - rc := r.Run() - <-watchCompletedNotifier - bar.Finish() - close(r.Results()) - fmt.Printf("Watch creation summary:\n%s", <-rc) - - // put phase - eventsTotal = 0 +func benchPutWatches(clients []*clientv3.Client, wk *watchedKeys) { + eventsTotal := 0 for i := 0; i < watchPutTotal; i++ { - eventsTotal += numWatchers[watched[i%len(watched)]] + eventsTotal += wk.numWatchers[wk.watched[i%len(wk.watched)]] } bar = pb.New(eventsTotal) bar.Format("Bom !") bar.Start() - atomic.StoreInt32(&nrRecvCompleted, 0) - recvCompletedNotifier = make(chan struct{}) - putreqc := make(chan v3.Op) + r := newReport() - r = report.NewReportRate("%4.4f") - for i := 0; i < watchPutTotal; i++ { - go func(c *v3.Client) { - for op := range putreqc { - if _, err := c.Do(context.TODO(), op); err != nil { - fmt.Fprintf(os.Stderr, "failed to Put for watch benchmark: %v\n", err) - os.Exit(1) - } - } - }(clients[i%len(clients)]) + wg.Add(len(wk.watches)) + nrRxed := int32(eventsTotal) + for _, w := range wk.watches { + go func(wc clientv3.WatchChan) { + defer wg.Done() + recvWatchChan(wc, r.Results(), &nrRxed) + wk.cancel() + }(w) } + putreqc := make(chan clientv3.Op, len(clients)) go func() { + defer close(putreqc) for i := 0; i < watchPutTotal; i++ { - putreqc <- v3.OpPut(watched[i%(len(watched))], "data") - // TODO: use a real rate-limiter instead of sleep. - time.Sleep(time.Second / time.Duration(watchPutRate)) + putreqc <- clientv3.OpPut(wk.watched[i%(len(wk.watched))], "data") } - close(putreqc) }() - rc = r.Run() - <-recvCompletedNotifier + limit := rate.NewLimiter(rate.Limit(watchPutRate), 1) + for _, cc := range clients { + go func(c *clientv3.Client) { + for op := range putreqc { + if err := limit.Wait(context.TODO()); err != nil { + panic(err) + } + if _, err := c.Do(context.TODO(), op); err != nil { + panic(err) + } + } + }(cc) + } + + rc := r.Run() + wg.Wait() bar.Finish() close(r.Results()) fmt.Printf("Watch events received summary:\n%s", <-rc) + } -func doWatch(stream v3.Watcher, requests <-chan string, results chan<- report.Result) { - for r := range requests { - st := time.Now() - wch := stream.Watch(context.TODO(), r) - results <- report.Result{Start: st, End: time.Now()} - bar.Increment() - go recvWatchChan(wch, results) - } - atomic.AddInt32(&nrWatchCompleted, 1) - if atomic.LoadInt32(&nrWatchCompleted) == int32(watchTotalStreams) { - watchCompletedNotifier <- struct{}{} - } -} - -func recvWatchChan(wch v3.WatchChan, results chan<- report.Result) { +func recvWatchChan(wch clientv3.WatchChan, results chan<- report.Result, nrRxed *int32) { for r := range wch { st := time.Now() for range r.Events { results <- report.Result{Start: st, End: time.Now()} bar.Increment() - atomic.AddInt32(&nrRecvCompleted, 1) - } - - if atomic.LoadInt32(&nrRecvCompleted) == int32(eventsTotal) { - recvCompletedNotifier <- struct{}{} - break + if atomic.AddInt32(nrRxed, -1) <= 0 { + return + } } } }