diff --git a/tools/benchmark/cmd/watch_latency.go b/tools/benchmark/cmd/watch_latency.go index 0f1f5db52..3a070d260 100644 --- a/tools/benchmark/cmd/watch_latency.go +++ b/tools/benchmark/cmd/watch_latency.go @@ -17,9 +17,10 @@ package cmd import ( "fmt" "os" + "sync" "time" - v3 "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/pkg/report" "github.com/spf13/cobra" @@ -47,19 +48,23 @@ var ( func init() { RootCmd.AddCommand(watchLatencyCmd) - watchLatencyCmd.Flags().IntVar(&watchLTotal, "total", 10000, "Total number of watch responses.") + watchLatencyCmd.Flags().IntVar(&watchLTotal, "total", 10000, "Total number of put requests") watchLatencyCmd.Flags().IntVar(&watchLPutRate, "put-rate", 100, "Number of keys to put per second") - watchLatencyCmd.Flags().IntVar(&watchLKeySize, "key-size", 32, "Key size of watch request") - watchLatencyCmd.Flags().IntVar(&watchLValueSize, "val-size", 32, "Val size of watch request") + watchLatencyCmd.Flags().IntVar(&watchLKeySize, "key-size", 32, "Key size of watch response") + watchLatencyCmd.Flags().IntVar(&watchLValueSize, "val-size", 32, "Value size of watch response") } func watchLatencyFunc(cmd *cobra.Command, args []string) { key := string(mustRandBytes(watchLKeySize)) value := string(mustRandBytes(watchLValueSize)) - client := mustCreateConn() - stream := v3.NewWatcher(client) - wch := stream.Watch(context.TODO(), key) + clients := mustCreateClients(totalClients, totalConns) + putClient := mustCreateConn() + + wchs := make([]clientv3.WatchChan, len(clients)) + for i := range wchs { + wchs[i] = clients[i].Watch(context.TODO(), key) + } bar = pb.New(watchLTotal) bar.Format("Bom !") @@ -74,15 +79,29 @@ func watchLatencyFunc(cmd *cobra.Command, args []string) { if err := limiter.Wait(context.TODO()); err != nil { break } - _, err := client.Put(context.TODO(), string(key), value) - if err != nil { + var st time.Time + var wg sync.WaitGroup + wg.Add(len(clients)) + barrierc := make(chan struct{}) + for _, wch := range wchs { + ch := wch + go func() { + <-barrierc + <-ch + r.Results() <- report.Result{Start: st, End: time.Now()} + wg.Done() + }() + } + + if _, err := putClient.Put(context.TODO(), key, value); err != nil { fmt.Fprintf(os.Stderr, "Failed to Put for watch latency benchmark: %v\n", err) os.Exit(1) } - st := time.Now() - <-wch - r.Results() <- report.Result{Err: err, Start: st, End: time.Now()} + + st = time.Now() + close(barrierc) + wg.Wait() bar.Increment() }