diff --git a/tools/benchmark/cmd/report.go b/tools/benchmark/cmd/report.go index 04459bb31..99173bb12 100644 --- a/tools/benchmark/cmd/report.go +++ b/tools/benchmark/cmd/report.go @@ -53,6 +53,17 @@ func printReport(size int, results chan *result, total time.Duration) { errorDist: make(map[string]int), } r.finalize() + r.print() +} + +func printRate(size int, results chan *result, total time.Duration) { + r := &report{ + results: results, + total: total, + errorDist: make(map[string]int), + } + r.finalize() + fmt.Printf(" Requests/sec:\t%4.4f\n", r.rps) } func (r *report) finalize() { @@ -68,7 +79,6 @@ func (r *report) finalize() { default: r.rps = float64(len(r.lats)) / r.total.Seconds() r.average = r.avgTotal / float64(len(r.lats)) - r.print() return } } diff --git a/tools/benchmark/cmd/watch.go b/tools/benchmark/cmd/watch.go new file mode 100644 index 000000000..101c648b3 --- /dev/null +++ b/tools/benchmark/cmd/watch.go @@ -0,0 +1,187 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cmd + +import ( + "fmt" + "os" + "time" + + "github.com/coreos/etcd/etcdserver/etcdserverpb" + + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/cheggaaa/pb" + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra" + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc" +) + +// watchCmd represents the watch command +var watchCmd = &cobra.Command{ + Use: "watch", + Short: "Benchmark watch", + Long: `Benchmark watch tests the performance of processing watch requests and +sending events to watchers. It tests the sending performance by +changing the value of the watched keys with concurrent put +requests. + +During the test, each watcher watches (--total/--watchers) keys +(a watcher might watch on the same key multiple times if +--watched-key-total is small). + +Each key is watched by (--total/--watched-key-total) watchers. +`, + Run: watchFunc, +} + +var ( + watchTotalStreams int + watchTotal int + watchedKeyTotal int + + watchPutRate int + watchPutTotal int +) + +func init() { + RootCmd.AddCommand(watchCmd) + watchCmd.Flags().IntVar(&watchTotalStreams, "watchers", 10000, "Total number of watchers") + watchCmd.Flags().IntVar(&watchTotal, "total", 100000, "Total number of watch requests") + watchCmd.Flags().IntVar(&watchedKeyTotal, "watched-key-total", 10000, "Total number of keys to be watched") + + watchCmd.Flags().IntVar(&watchPutRate, "put-rate", 100, "Number of keys to put per second") + watchCmd.Flags().IntVar(&watchPutTotal, "put-total", 10000, "Number of put requests") +} + +func watchFunc(cmd *cobra.Command, args []string) { + watched := make([][]byte, watchedKeyTotal) + for i := range watched { + watched[i] = mustRandBytes(32) + } + + requests := make(chan *etcdserverpb.WatchRequest, watchTotal) + + conns := make([]*grpc.ClientConn, totalConns) + for i := range conns { + conns[i] = mustCreateConn() + } + + clients := make([]etcdserverpb.WatchClient, totalClients) + for i := range clients { + clients[i] = etcdserverpb.NewWatchClient(conns[i%int(totalConns)]) + } + + streams := make([]etcdserverpb.Watch_WatchClient, watchTotalStreams) + var err error + for i := range streams { + streams[i], err = clients[i%int(totalClients)].Watch(context.TODO()) + if err != nil { + fmt.Fprintln(os.Stderr, "Failed to create watch stream:", err) + os.Exit(1) + } + } + + for i := range streams { + wg.Add(1) + go doWatch(streams[i], requests) + } + + // watching phase + results = make(chan *result, watchTotal) + bar = pb.New(watchTotal) + + bar.Format("Bom !") + bar.Start() + + start := time.Now() + for i := 0; i < watchTotal; i++ { + r := &etcdserverpb.WatchRequest{ + Key: watched[i%(len(watched))], + } + requests <- r + } + close(requests) + + wg.Wait() + bar.Finish() + fmt.Printf("Watch creation summary:\n") + printRate(watchTotal, results, time.Now().Sub(start)) + + // put phase + kv := etcdserverpb.NewKVClient(conns[0]) + // total number of puts * number of watchers on each key + eventsTotal := watchPutTotal * (watchTotal / watchedKeyTotal) + + results = make(chan *result, eventsTotal) + bar = pb.New(eventsTotal) + + bar.Format("Bom !") + bar.Start() + + start = time.Now() + + // TODO: create multiple clients to do put to increase throughput + // TODO: use a real rate-limiter instead of sleep. + for i := 0; i < watchPutTotal; i++ { + r := &etcdserverpb.PutRequest{ + Key: watched[i%(len(watched))], + Value: []byte("data"), + } + _, err := kv.Put(context.TODO(), r) + if err != nil { + fmt.Fprintln(os.Stderr, "Failed to put:", err) + } + time.Sleep(time.Second / time.Duration(watchPutRate)) + } + + for { + if len(results) == eventsTotal { + break + } + time.Sleep(50 * time.Millisecond) + } + + bar.Finish() + fmt.Printf("Watch events received summary:\n") + printRate(eventsTotal, results, time.Now().Sub(start)) +} + +func doWatch(stream etcdserverpb.Watch_WatchClient, requests <-chan *etcdserverpb.WatchRequest) { + for r := range requests { + st := time.Now() + err := stream.Send(r) + var errStr string + if err != nil { + errStr = err.Error() + } + results <- &result{ + errStr: errStr, + duration: time.Since(st), + } + bar.Increment() + } + wg.Done() + + for { + _, err := stream.Recv() + var errStr string + if err != nil { + errStr = err.Error() + } + results <- &result{ + errStr: errStr, + } + bar.Increment() + } +}