From 99e7449f44b3501a0044a71c08f882c7994fc586 Mon Sep 17 00:00:00 2001 From: Hitoshi Mitake Date: Tue, 9 Feb 2016 16:35:15 +0900 Subject: [PATCH] tools/benchmark: revive watch benchmark Current watch benchmark seems to be broken. This commit revives it. --- tools/benchmark/cmd/watch.go | 60 ++++++++++++++++++++++++++++-------- 1 file changed, 48 insertions(+), 12 deletions(-) diff --git a/tools/benchmark/cmd/watch.go b/tools/benchmark/cmd/watch.go index 3f3ec0a68..1a73f33c0 100644 --- a/tools/benchmark/cmd/watch.go +++ b/tools/benchmark/cmd/watch.go @@ -17,6 +17,7 @@ package cmd import ( "fmt" "os" + "sync/atomic" "time" "github.com/coreos/etcd/etcdserver/etcdserverpb" @@ -51,6 +52,14 @@ var ( watchPutRate int watchPutTotal int + + eventsTotal int + + nrWatchCompleted int32 + nrRecvCompleted int32 + watchCompletedNotifier chan struct{} + putStartNotifier chan struct{} + recvCompletedNotifier chan struct{} ) func init() { @@ -83,10 +92,7 @@ func watchFunc(cmd *cobra.Command, args []string) { } } - for i := range streams { - wg.Add(1) - go doWatch(streams[i], requests) - } + putStartNotifier = make(chan struct{}) // watching phase results = make(chan result) @@ -97,6 +103,12 @@ func watchFunc(cmd *cobra.Command, args []string) { pdoneC := printRate(results) + atomic.StoreInt32(&nrWatchCompleted, int32(0)) + watchCompletedNotifier = make(chan struct{}) + for i := range streams { + go doWatch(streams[i], requests) + } + go func() { for i := 0; i < watchTotal; i++ { requests <- etcdserverpb.WatchRequest{ @@ -107,7 +119,7 @@ func watchFunc(cmd *cobra.Command, args []string) { close(requests) }() - wg.Wait() + <-watchCompletedNotifier bar.Finish() fmt.Printf("Watch creation summary:\n") @@ -116,19 +128,21 @@ func watchFunc(cmd *cobra.Command, args []string) { // put phase // total number of puts * number of watchers on each key - eventsTotal := watchPutTotal * (watchTotal / watchedKeyTotal) - + eventsTotal = watchPutTotal * (watchTotal / watchedKeyTotal) results = make(chan result) bar = pb.New(eventsTotal) bar.Format("Bom !") bar.Start() + atomic.StoreInt32(&nrRecvCompleted, 0) + recvCompletedNotifier = make(chan struct{}) + close(putStartNotifier) + putreqc := make(chan etcdserverpb.PutRequest) for i := 0; i < watchPutTotal; i++ { - wg.Add(1) - go doPut(context.TODO(), clients[i%len(clients)].KV, putreqc) + go doPutForWatch(context.TODO(), clients[i%len(clients)].KV, putreqc) } pdoneC = printRate(results) @@ -145,7 +159,7 @@ func watchFunc(cmd *cobra.Command, args []string) { close(putreqc) }() - wg.Wait() + <-recvCompletedNotifier bar.Finish() fmt.Printf("Watch events received summary:\n") close(results) @@ -163,14 +177,36 @@ func doWatch(stream etcdserverpb.Watch_WatchClient, requests <-chan etcdserverpb results <- result{errStr: errStr, duration: time.Since(st)} bar.Increment() } + atomic.AddInt32(&nrWatchCompleted, 1) + if atomic.LoadInt32(&nrWatchCompleted) == int32(watchTotalStreams) { + watchCompletedNotifier <- struct{}{} + } + + <-putStartNotifier + for { + st := time.Now() _, err := stream.Recv() var errStr string if err != nil { errStr = err.Error() } - results <- result{errStr: errStr} + results <- result{errStr: errStr, duration: time.Since(st)} bar.Increment() + + atomic.AddInt32(&nrRecvCompleted, 1) + if atomic.LoadInt32(&nrRecvCompleted) == int32(eventsTotal) { + recvCompletedNotifier <- struct{}{} + } + } +} + +func doPutForWatch(ctx context.Context, client etcdserverpb.KVClient, requests <-chan etcdserverpb.PutRequest) { + for r := range requests { + _, err := client.Put(ctx, &r) + if err != nil { + fmt.Fprintln(os.Stderr, "failed to Put for watch benchmark: %s", err) + os.Exit(1) + } } - wg.Done() }