From 9ca84e814f16bd3a2a96a91449d304908bc51717 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Thu, 19 May 2016 09:57:35 -0700 Subject: [PATCH] benchmark: fix watch command Fix https://github.com/coreos/etcd/issues/5099. --- tools/benchmark/cmd/watch.go | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/tools/benchmark/cmd/watch.go b/tools/benchmark/cmd/watch.go index 7468f4f31..6909c82d2 100644 --- a/tools/benchmark/cmd/watch.go +++ b/tools/benchmark/cmd/watch.go @@ -88,6 +88,7 @@ func watchFunc(cmd *cobra.Command, args []string) { } watched := make([]string, watchedKeyTotal) + numWatchers := make(map[string]int) for i := range watched { k := make([]byte, watchKeySize) if watchSeqKeys { @@ -96,6 +97,7 @@ func watchFunc(cmd *cobra.Command, args []string) { binary.PutVarint(k, int64(rand.Intn(watchKeySpaceSize))) } watched[i] = string(k) + numWatchers[watched[i]] = numWatchers[watched[i]] + 1 } requests := make(chan string, totalClients) @@ -137,8 +139,10 @@ func watchFunc(cmd *cobra.Command, args []string) { <-pdoneC // put phase - // total number of puts * number of watchers on each key - eventsTotal = watchPutTotal * (watchTotal / watchedKeyTotal) + eventsTotal = 0 + for i := 0; i < watchPutTotal; i++ { + eventsTotal += numWatchers[watched[i%len(watched)]] + } results = make(chan result) bar = pb.New(eventsTotal) @@ -157,7 +161,7 @@ func watchFunc(cmd *cobra.Command, args []string) { pdoneC = printRate(results) go func() { - for i := 0; i < eventsTotal; i++ { + for i := 0; i < watchPutTotal; i++ { putreqc <- v3.OpPut(watched[i%(len(watched))], "data") // TODO: use a real rate-limiter instead of sleep. time.Sleep(time.Second / time.Duration(watchPutRate)) @@ -165,9 +169,7 @@ func watchFunc(cmd *cobra.Command, args []string) { close(putreqc) }() - for range streams { - <-recvCompletedNotifier - } + <-recvCompletedNotifier bar.Finish() fmt.Printf("Watch events received summary:\n") close(results) @@ -194,16 +196,15 @@ func doWatch(stream v3.Watcher, requests <-chan string) { func recvWatchChan(wch v3.WatchChan) { for range wch { - if atomic.LoadInt32(&nrRecvCompleted) == int32(eventsTotal) { - recvCompletedNotifier <- struct{}{} - break - } - st := time.Now() results <- result{duration: time.Since(st), happened: time.Now()} bar.Increment() atomic.AddInt32(&nrRecvCompleted, 1) + if atomic.LoadInt32(&nrRecvCompleted) == int32(eventsTotal) { + recvCompletedNotifier <- struct{}{} + break + } } }