diff --git a/pkg/report/doc.go b/pkg/report/doc.go new file mode 100644 index 000000000..1ebd119f5 --- /dev/null +++ b/pkg/report/doc.go @@ -0,0 +1,16 @@ +// Copyright 2016 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package report generates human-readable benchmark reports. +package report diff --git a/pkg/report/report.go b/pkg/report/report.go new file mode 100644 index 000000000..ea4d17695 --- /dev/null +++ b/pkg/report/report.go @@ -0,0 +1,219 @@ +// Copyright 2014 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// the file is borrowed from github.com/rakyll/boom/boomer/print.go + +package report + +import ( + "fmt" + "math" + "sort" + "strings" + "time" +) + +const ( + barChar = "∎" +) + +// Result describes the timings for an operation. +type Result struct { + Start time.Time + End time.Time + Err error +} + +func (res *Result) Duration() time.Duration { return res.End.Sub(res.Start) } + +type report struct { + results chan Result + precision string + + avgTotal float64 + fastest float64 + slowest float64 + average float64 + stddev float64 + rps float64 + total time.Duration + + errorDist map[string]int + lats []float64 + + sps *secondPoints +} + +// Report processes a result stream until it is closed, then produces a +// string with information about the consumed result data. +type Report interface { + Results() chan<- Result + Run() <-chan string + String() string +} + +func NewReport(precision string) Report { + return &report{ + results: make(chan Result, 16), + precision: precision, + errorDist: make(map[string]int), + } +} + +func NewReportSample(precision string) Report { + r := NewReport(precision).(*report) + r.sps = newSecondPoints() + return r +} + +func (r *report) Results() chan<- Result { return r.results } + +func (r *report) Run() <-chan string { + donec := make(chan string, 1) + go func() { + defer close(donec) + r.processResults() + donec <- r.String() + }() + return donec +} + +func (r *report) String() (s string) { + if len(r.lats) > 0 { + s += fmt.Sprintf("\nSummary:\n") + s += fmt.Sprintf(" Total:\t%s.\n", r.sec2str(r.total.Seconds())) + s += fmt.Sprintf(" Slowest:\t%s.\n", r.sec2str(r.slowest)) + s += fmt.Sprintf(" Fastest:\t%s.\n", r.sec2str(r.fastest)) + s += fmt.Sprintf(" Average:\t%s.\n", r.sec2str(r.average)) + s += fmt.Sprintf(" Stddev:\t%s.\n", r.sec2str(r.stddev)) + s += fmt.Sprintf(" Requests/sec:\t"+r.precision+"\n", r.rps) + s += r.histogram() + s += r.latencies() + if r.sps != nil { + s += fmt.Sprintf("%v\n", r.sps.getTimeSeries()) + } + } + if len(r.errorDist) > 0 { + s += r.errors() + } + return s +} + +func (r *report) sec2str(sec float64) string { return fmt.Sprintf(r.precision+" secs", sec) } + +type reportRate struct{ *report } + +func NewReportRate(precision string) Report { + return &reportRate{NewReport(precision).(*report)} +} + +func (r *reportRate) String() string { + return fmt.Sprintf(" Requests/sec:\t"+r.precision+"\n", r.rps) +} + +func (r *report) processResult(res *Result) { + if res.Err != nil { + r.errorDist[res.Err.Error()]++ + return + } + dur := res.Duration() + r.lats = append(r.lats, dur.Seconds()) + r.avgTotal += dur.Seconds() + if r.sps != nil { + r.sps.Add(res.Start, dur) + } +} + +func (r *report) processResults() { + st := time.Now() + for res := range r.results { + r.processResult(&res) + } + r.total = time.Since(st) + + r.rps = float64(len(r.lats)) / r.total.Seconds() + r.average = r.avgTotal / float64(len(r.lats)) + for i := range r.lats { + dev := r.lats[i] - r.average + r.stddev += dev * dev + } + r.stddev = math.Sqrt(r.stddev / float64(len(r.lats))) + sort.Float64s(r.lats) + if len(r.lats) > 0 { + r.fastest = r.lats[0] + r.slowest = r.lats[len(r.lats)-1] + } +} + +func (r *report) latencies() string { + pctls := []int{10, 25, 50, 75, 90, 95, 99} + data := make([]float64, len(pctls)) + j := 0 + for i := 0; i < len(r.lats) && j < len(pctls); i++ { + current := i * 100 / len(r.lats) + if current >= pctls[j] { + data[j] = r.lats[i] + j++ + } + } + s := fmt.Sprintf("\nLatency distribution:\n") + for i := 0; i < len(pctls); i++ { + if data[i] > 0 { + s += fmt.Sprintf(" %v%% in %s.\n", pctls[i], r.sec2str(data[i])) + } + } + return s +} + +func (r *report) histogram() string { + bc := 10 + buckets := make([]float64, bc+1) + counts := make([]int, bc+1) + bs := (r.slowest - r.fastest) / float64(bc) + for i := 0; i < bc; i++ { + buckets[i] = r.fastest + bs*float64(i) + } + buckets[bc] = r.slowest + var bi int + var max int + for i := 0; i < len(r.lats); { + if r.lats[i] <= buckets[bi] { + i++ + counts[bi]++ + if max < counts[bi] { + max = counts[bi] + } + } else if bi < len(buckets)-1 { + bi++ + } + } + s := fmt.Sprintf("\nResponse time histogram:\n") + for i := 0; i < len(buckets); i++ { + // Normalize bar lengths. + var barLen int + if max > 0 { + barLen = counts[i] * 40 / max + } + s += fmt.Sprintf(" "+r.precision+" [%v]\t|%v\n", buckets[i], counts[i], strings.Repeat(barChar, barLen)) + } + return s +} + +func (r *report) errors() string { + s := fmt.Sprintf("\nError distribution:\n") + for err, num := range r.errorDist { + s += fmt.Sprintf(" [%d]\t%s\n", num, err) + } + return s +} diff --git a/tools/benchmark/cmd/timeseries.go b/pkg/report/timeseries.go similarity index 99% rename from tools/benchmark/cmd/timeseries.go rename to pkg/report/timeseries.go index e535bf869..0cabb2339 100644 --- a/tools/benchmark/cmd/timeseries.go +++ b/pkg/report/timeseries.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package cmd +package report import ( "bytes" diff --git a/tools/benchmark/cmd/timeseries_test.go b/pkg/report/timeseries_test.go similarity index 98% rename from tools/benchmark/cmd/timeseries_test.go rename to pkg/report/timeseries_test.go index 82bfa9b45..99874cd44 100644 --- a/tools/benchmark/cmd/timeseries_test.go +++ b/pkg/report/timeseries_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package cmd +package report import ( "testing" diff --git a/tools/benchmark/cmd/lease.go b/tools/benchmark/cmd/lease.go index 6bc44fc0c..81dd30cb6 100644 --- a/tools/benchmark/cmd/lease.go +++ b/tools/benchmark/cmd/lease.go @@ -15,9 +15,12 @@ package cmd import ( + "fmt" "time" v3 "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/pkg/report" + "github.com/spf13/cobra" "golang.org/x/net/context" "gopkg.in/cheggaaa/pb.v1" @@ -40,53 +43,39 @@ func init() { } func leaseKeepaliveFunc(cmd *cobra.Command, args []string) { - results = make(chan result) requests := make(chan struct{}) - bar = pb.New(leaseKeepaliveTotal) - clients := mustCreateClients(totalClients, totalConns) + bar = pb.New(leaseKeepaliveTotal) bar.Format("Bom !") bar.Start() + r := newReport() for i := range clients { wg.Add(1) - go doLeaseKeepalive(context.Background(), clients[i].Lease, requests) + go func(c v3.Lease) { + defer wg.Done() + resp, err := c.Grant(context.Background(), 100) + if err != nil { + panic(err) + } + for _ = range requests { + st := time.Now() + _, err := c.KeepAliveOnce(context.TODO(), resp.ID) + r.Results() <- report.Result{Err: err, Start: st, End: time.Now()} + bar.Increment() + } + }(clients[i]) } - pdoneC := printReport(results) - for i := 0; i < leaseKeepaliveTotal; i++ { requests <- struct{}{} } close(requests) + rc := r.Run() wg.Wait() - + close(r.Results()) bar.Finish() - - close(results) - <-pdoneC -} - -func doLeaseKeepalive(ctx context.Context, client v3.Lease, requests <-chan struct{}) { - defer wg.Done() - - resp, err := client.Grant(ctx, 100) - if err != nil { - panic(err) - } - - for _ = range requests { - st := time.Now() - - _, err := client.KeepAliveOnce(ctx, resp.ID) - - var errStr string - if err != nil { - errStr = err.Error() - } - results <- result{errStr: errStr, duration: time.Since(st), happened: time.Now()} - bar.Increment() - } + fmt.Printf("%s", <-rc) } diff --git a/tools/benchmark/cmd/mvcc-put.go b/tools/benchmark/cmd/mvcc-put.go index 6b35cdc74..992ff2331 100644 --- a/tools/benchmark/cmd/mvcc-put.go +++ b/tools/benchmark/cmd/mvcc-put.go @@ -22,6 +22,8 @@ import ( "time" "github.com/coreos/etcd/lease" + "github.com/coreos/etcd/pkg/report" + "github.com/spf13/cobra" ) @@ -100,14 +102,12 @@ func mvccPutFunc(cmd *cobra.Command, args []string) { keys := createBytesSlice(storageKeySize, totalNrKeys) vals := createBytesSlice(valueSize, totalNrKeys) - latencies := make([]time.Duration, totalNrKeys) - - minLat := time.Duration(1<<63 - 1) - maxLat := time.Duration(0) + r := newReport() + rrc := r.Results() + rc := r.Run() for i := 0; i < totalNrKeys; i++ { - begin := time.Now() - + st := time.Now() if txn { id := s.TxnBegin() if _, err := s.TxnPut(id, keys[i], vals[i], lease.NoLease); err != nil { @@ -118,33 +118,9 @@ func mvccPutFunc(cmd *cobra.Command, args []string) { } else { s.Put(keys[i], vals[i], lease.NoLease) } - - end := time.Now() - - lat := end.Sub(begin) - latencies[i] = lat - if maxLat < lat { - maxLat = lat - } - if lat < minLat { - minLat = lat - } + rrc <- report.Result{Start: st, End: time.Now()} } - total := time.Duration(0) - - for _, lat := range latencies { - total += lat - } - - fmt.Printf("total: %v\n", total) - fmt.Printf("average: %v\n", total/time.Duration(totalNrKeys)) - fmt.Printf("rate: %4.4f\n", float64(totalNrKeys)/total.Seconds()) - fmt.Printf("minimum latency: %v\n", minLat) - fmt.Printf("maximum latency: %v\n", maxLat) - - // TODO: Currently this benchmark doesn't use the common histogram infrastructure. - // This is because an accuracy of the infrastructure isn't suitable for measuring - // performance of kv storage: - // https://github.com/coreos/etcd/pull/4070#issuecomment-167954149 + close(r.Results()) + fmt.Printf("%s", <-rc) } diff --git a/tools/benchmark/cmd/put.go b/tools/benchmark/cmd/put.go index aab8d5a42..200716dd7 100644 --- a/tools/benchmark/cmd/put.go +++ b/tools/benchmark/cmd/put.go @@ -22,6 +22,8 @@ import ( "time" v3 "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/pkg/report" + "github.com/spf13/cobra" "golang.org/x/net/context" "gopkg.in/cheggaaa/pb.v1" @@ -65,24 +67,28 @@ func putFunc(cmd *cobra.Command, args []string) { os.Exit(1) } - results = make(chan result) requests := make(chan v3.Op, totalClients) - bar = pb.New(putTotal) - + clients := mustCreateClients(totalClients, totalConns) k, v := make([]byte, keySize), string(mustRandBytes(valSize)) - clients := mustCreateClients(totalClients, totalConns) - + bar = pb.New(putTotal) bar.Format("Bom !") bar.Start() + r := newReport() for i := range clients { wg.Add(1) - go doPut(context.Background(), clients[i], requests) + go func(c *v3.Client) { + defer wg.Done() + for op := range requests { + st := time.Now() + _, err := c.Do(context.Background(), op) + r.Results() <- report.Result{Err: err, Start: st, End: time.Now()} + bar.Increment() + } + }(clients[i]) } - pdoneC := printReport(results) - go func() { for i := 0; i < putTotal; i++ { if seqKeys { @@ -104,28 +110,11 @@ func putFunc(cmd *cobra.Command, args []string) { }() } + rc := r.Run() wg.Wait() - + close(r.Results()) bar.Finish() - - close(results) - <-pdoneC -} - -func doPut(ctx context.Context, client v3.KV, requests <-chan v3.Op) { - defer wg.Done() - - for op := range requests { - st := time.Now() - _, err := client.Do(ctx, op) - - var errStr string - if err != nil { - errStr = err.Error() - } - results <- result{errStr: errStr, duration: time.Since(st), happened: time.Now()} - bar.Increment() - } + fmt.Println(<-rc) } func compactKV(clients []*v3.Client) { diff --git a/tools/benchmark/cmd/range.go b/tools/benchmark/cmd/range.go index 444e48b3b..677a8af94 100644 --- a/tools/benchmark/cmd/range.go +++ b/tools/benchmark/cmd/range.go @@ -20,6 +20,8 @@ import ( "time" v3 "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/pkg/report" + "github.com/spf13/cobra" "golang.org/x/net/context" "gopkg.in/cheggaaa/pb.v1" @@ -65,22 +67,27 @@ func rangeFunc(cmd *cobra.Command, args []string) { os.Exit(1) } - results = make(chan result) requests := make(chan v3.Op, totalClients) - bar = pb.New(rangeTotal) - clients := mustCreateClients(totalClients, totalConns) + bar = pb.New(rangeTotal) bar.Format("Bom !") bar.Start() + r := newReport() for i := range clients { wg.Add(1) - go doRange(clients[i].KV, requests) + go func(c *v3.Client) { + defer wg.Done() + for op := range requests { + st := time.Now() + _, err := c.Do(context.Background(), op) + r.Results() <- report.Result{Err: err, Start: st, End: time.Now()} + bar.Increment() + } + }(clients[i]) } - pdoneC := printReport(results) - go func() { for i := 0; i < rangeTotal; i++ { opts := []v3.OpOption{v3.WithRange(end)} @@ -93,26 +100,9 @@ func rangeFunc(cmd *cobra.Command, args []string) { close(requests) }() + rc := r.Run() wg.Wait() - + close(r.Results()) bar.Finish() - - close(results) - <-pdoneC -} - -func doRange(client v3.KV, requests <-chan v3.Op) { - defer wg.Done() - - for op := range requests { - st := time.Now() - _, err := client.Do(context.Background(), op) - - var errStr string - if err != nil { - errStr = err.Error() - } - results <- result{errStr: errStr, duration: time.Since(st), happened: time.Now()} - bar.Increment() - } + fmt.Printf("%s", <-rc) } diff --git a/tools/benchmark/cmd/report.go b/tools/benchmark/cmd/report.go deleted file mode 100644 index 3d9c704de..000000000 --- a/tools/benchmark/cmd/report.go +++ /dev/null @@ -1,196 +0,0 @@ -// Copyright 2014 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// the file is borrowed from github.com/rakyll/boom/boomer/print.go - -package cmd - -import ( - "fmt" - "math" - "sort" - "strings" - "time" -) - -const ( - barChar = "∎" -) - -type result struct { - errStr string - duration time.Duration - happened time.Time -} - -type report struct { - avgTotal float64 - fastest float64 - slowest float64 - average float64 - stddev float64 - rps float64 - - results chan result - total time.Duration - - errorDist map[string]int - lats []float64 - - sps *secondPoints -} - -func printReport(results chan result) <-chan struct{} { - return wrapReport(func() { - r := &report{ - results: results, - errorDist: make(map[string]int), - sps: newSecondPoints(), - } - r.finalize() - r.print() - }) -} - -func printRate(results chan result) <-chan struct{} { - return wrapReport(func() { - r := &report{ - results: results, - errorDist: make(map[string]int), - sps: newSecondPoints(), - } - r.finalize() - fmt.Printf(" Requests/sec:\t%4.4f\n", r.rps) - }) -} - -func wrapReport(f func()) <-chan struct{} { - donec := make(chan struct{}) - go func() { - defer close(donec) - f() - }() - return donec -} - -func (r *report) finalize() { - st := time.Now() - for res := range r.results { - if res.errStr != "" { - r.errorDist[res.errStr]++ - } else { - r.sps.Add(res.happened, res.duration) - r.lats = append(r.lats, res.duration.Seconds()) - r.avgTotal += res.duration.Seconds() - } - } - r.total = time.Since(st) - - r.rps = float64(len(r.lats)) / r.total.Seconds() - r.average = r.avgTotal / float64(len(r.lats)) - for i := range r.lats { - dev := r.lats[i] - r.average - r.stddev += dev * dev - } - r.stddev = math.Sqrt(r.stddev / float64(len(r.lats))) -} - -func (r *report) print() { - sort.Float64s(r.lats) - - if len(r.lats) > 0 { - r.fastest = r.lats[0] - r.slowest = r.lats[len(r.lats)-1] - fmt.Printf("\nSummary:\n") - fmt.Printf(" Total:\t%4.4f secs.\n", r.total.Seconds()) - fmt.Printf(" Slowest:\t%4.4f secs.\n", r.slowest) - fmt.Printf(" Fastest:\t%4.4f secs.\n", r.fastest) - fmt.Printf(" Average:\t%4.4f secs.\n", r.average) - fmt.Printf(" Stddev:\t%4.4f secs.\n", r.stddev) - fmt.Printf(" Requests/sec:\t%4.4f\n", r.rps) - r.printHistogram() - r.printLatencies() - if sample { - r.printSecondSample() - } - } - - if len(r.errorDist) > 0 { - r.printErrors() - } -} - -// Prints percentile latencies. -func (r *report) printLatencies() { - pctls := []int{10, 25, 50, 75, 90, 95, 99} - data := make([]float64, len(pctls)) - j := 0 - for i := 0; i < len(r.lats) && j < len(pctls); i++ { - current := i * 100 / len(r.lats) - if current >= pctls[j] { - data[j] = r.lats[i] - j++ - } - } - fmt.Printf("\nLatency distribution:\n") - for i := 0; i < len(pctls); i++ { - if data[i] > 0 { - fmt.Printf(" %v%% in %4.4f secs.\n", pctls[i], data[i]) - } - } -} - -func (r *report) printSecondSample() { - fmt.Println(r.sps.getTimeSeries()) -} - -func (r *report) printHistogram() { - bc := 10 - buckets := make([]float64, bc+1) - counts := make([]int, bc+1) - bs := (r.slowest - r.fastest) / float64(bc) - for i := 0; i < bc; i++ { - buckets[i] = r.fastest + bs*float64(i) - } - buckets[bc] = r.slowest - var bi int - var max int - for i := 0; i < len(r.lats); { - if r.lats[i] <= buckets[bi] { - i++ - counts[bi]++ - if max < counts[bi] { - max = counts[bi] - } - } else if bi < len(buckets)-1 { - bi++ - } - } - fmt.Printf("\nResponse time histogram:\n") - for i := 0; i < len(buckets); i++ { - // Normalize bar lengths. - var barLen int - if max > 0 { - barLen = counts[i] * 40 / max - } - fmt.Printf(" %4.3f [%v]\t|%v\n", buckets[i], counts[i], strings.Repeat(barChar, barLen)) - } -} - -func (r *report) printErrors() { - fmt.Printf("\nError distribution:\n") - for err, num := range r.errorDist { - fmt.Printf(" [%d]\t%s\n", num, err) - } -} diff --git a/tools/benchmark/cmd/root.go b/tools/benchmark/cmd/root.go index 9dbee5cf6..e52d97000 100644 --- a/tools/benchmark/cmd/root.go +++ b/tools/benchmark/cmd/root.go @@ -18,6 +18,7 @@ import ( "sync" "github.com/coreos/etcd/pkg/transport" + "github.com/spf13/cobra" "gopkg.in/cheggaaa/pb.v1" ) @@ -38,9 +39,8 @@ var ( totalClients uint sample bool - bar *pb.ProgressBar - results chan result - wg sync.WaitGroup + bar *pb.ProgressBar + wg sync.WaitGroup tls transport.TLSInfo diff --git a/tools/benchmark/cmd/stm.go b/tools/benchmark/cmd/stm.go index 90a4572de..7f98ed100 100644 --- a/tools/benchmark/cmd/stm.go +++ b/tools/benchmark/cmd/stm.go @@ -23,6 +23,8 @@ import ( v3 "github.com/coreos/etcd/clientv3" v3sync "github.com/coreos/etcd/clientv3/concurrency" + "github.com/coreos/etcd/pkg/report" + "github.com/spf13/cobra" "golang.org/x/net/context" "gopkg.in/cheggaaa/pb.v1" @@ -89,22 +91,19 @@ func stmFunc(cmd *cobra.Command, args []string) { os.Exit(1) } - results = make(chan result) requests := make(chan stmApply, totalClients) - bar = pb.New(stmTotal) - clients := mustCreateClients(totalClients, totalConns) + bar = pb.New(stmTotal) bar.Format("Bom !") bar.Start() + r := newReport() for i := range clients { wg.Add(1) - go doSTM(context.Background(), clients[i], requests) + go doSTM(clients[i], requests, r.Results()) } - pdoneC := printReport(results) - go func() { for i := 0; i < stmTotal; i++ { kset := make(map[string]struct{}) @@ -132,15 +131,14 @@ func stmFunc(cmd *cobra.Command, args []string) { close(requests) }() + rc := r.Run() wg.Wait() - + close(r.Results()) bar.Finish() - - close(results) - <-pdoneC + fmt.Printf("%s", <-rc) } -func doSTM(ctx context.Context, client *v3.Client, requests <-chan stmApply) { +func doSTM(client *v3.Client, requests <-chan stmApply, results chan<- report.Result) { defer wg.Done() var m *v3sync.Mutex @@ -161,11 +159,7 @@ func doSTM(ctx context.Context, client *v3.Client, requests <-chan stmApply) { if m != nil { m.Unlock(context.TODO()) } - var errStr string - if err != nil { - errStr = err.Error() - } - results <- result{errStr: errStr, duration: time.Since(st), happened: time.Now()} + results <- report.Result{Err: err, Start: st, End: time.Now()} bar.Increment() } } diff --git a/tools/benchmark/cmd/util.go b/tools/benchmark/cmd/util.go index 4a9df3c7b..d11d23f61 100644 --- a/tools/benchmark/cmd/util.go +++ b/tools/benchmark/cmd/util.go @@ -21,6 +21,7 @@ import ( "strings" "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/pkg/report" ) var ( @@ -83,3 +84,10 @@ func mustRandBytes(n int) []byte { } return rb } + +func newReport() report.Report { + if sample { + return report.NewReportSample("%4.4f") + } + return report.NewReport("%4.4f") +} diff --git a/tools/benchmark/cmd/watch.go b/tools/benchmark/cmd/watch.go index 4cdc899d0..b73e4f20e 100644 --- a/tools/benchmark/cmd/watch.go +++ b/tools/benchmark/cmd/watch.go @@ -23,6 +23,7 @@ import ( "time" v3 "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/pkg/report" "github.com/spf13/cobra" "golang.org/x/net/context" @@ -110,18 +111,16 @@ func watchFunc(cmd *cobra.Command, args []string) { } // watching phase - results = make(chan result) bar = pb.New(watchTotal) - bar.Format("Bom !") bar.Start() - pdoneC := printRate(results) - atomic.StoreInt32(&nrWatchCompleted, int32(0)) watchCompletedNotifier = make(chan struct{}) + + r := report.NewReportRate("%4.4f") for i := range streams { - go doWatch(streams[i], requests) + go doWatch(streams[i], requests, r.Results()) } go func() { @@ -133,35 +132,38 @@ func watchFunc(cmd *cobra.Command, args []string) { close(requests) }() + rc := r.Run() <-watchCompletedNotifier bar.Finish() - - fmt.Printf("Watch creation summary:\n") - close(results) - <-pdoneC + close(r.Results()) + fmt.Printf("Watch creation summary:\n%s", <-rc) // put phase eventsTotal = 0 for i := 0; i < watchPutTotal; i++ { eventsTotal += numWatchers[watched[i%len(watched)]] } - results = make(chan result) - bar = pb.New(eventsTotal) + bar = pb.New(eventsTotal) bar.Format("Bom !") bar.Start() atomic.StoreInt32(&nrRecvCompleted, 0) recvCompletedNotifier = make(chan struct{}) - putreqc := make(chan v3.Op) + r = report.NewReportRate("%4.4f") for i := 0; i < watchPutTotal; i++ { - go doPutForWatch(context.TODO(), clients[i%len(clients)].KV, putreqc) + go func(c *v3.Client) { + for op := range putreqc { + if _, err := c.Do(context.TODO(), op); err != nil { + fmt.Fprintf(os.Stderr, "failed to Put for watch benchmark: %v\n", err) + os.Exit(1) + } + } + }(clients[i%len(clients)]) } - pdoneC = printRate(results) - go func() { for i := 0; i < watchPutTotal; i++ { putreqc <- v3.OpPut(watched[i%(len(watched))], "data") @@ -171,24 +173,20 @@ func watchFunc(cmd *cobra.Command, args []string) { close(putreqc) }() + rc = r.Run() <-recvCompletedNotifier bar.Finish() - fmt.Printf("Watch events received summary:\n") - close(results) - <-pdoneC + close(r.Results()) + fmt.Printf("Watch events received summary:\n%s", <-rc) } -func doWatch(stream v3.Watcher, requests <-chan string) { +func doWatch(stream v3.Watcher, requests <-chan string, results chan<- report.Result) { for r := range requests { st := time.Now() wch := stream.Watch(context.TODO(), r) - var errStr string - if wch == nil { - errStr = "could not open watch channel" - } - results <- result{errStr: errStr, duration: time.Since(st), happened: time.Now()} + results <- report.Result{Start: st, End: time.Now()} bar.Increment() - go recvWatchChan(wch) + go recvWatchChan(wch, results) } atomic.AddInt32(&nrWatchCompleted, 1) if atomic.LoadInt32(&nrWatchCompleted) == int32(watchTotalStreams) { @@ -196,11 +194,11 @@ func doWatch(stream v3.Watcher, requests <-chan string) { } } -func recvWatchChan(wch v3.WatchChan) { +func recvWatchChan(wch v3.WatchChan, results chan<- report.Result) { for r := range wch { st := time.Now() for range r.Events { - results <- result{duration: time.Since(st), happened: time.Now()} + results <- report.Result{Start: st, End: time.Now()} bar.Increment() atomic.AddInt32(&nrRecvCompleted, 1) } @@ -211,13 +209,3 @@ func recvWatchChan(wch v3.WatchChan) { } } } - -func doPutForWatch(ctx context.Context, client v3.KV, requests <-chan v3.Op) { - for op := range requests { - _, err := client.Do(ctx, op) - if err != nil { - fmt.Fprintf(os.Stderr, "failed to Put for watch benchmark: %v\n", err) - os.Exit(1) - } - } -} diff --git a/tools/benchmark/cmd/watch_get.go b/tools/benchmark/cmd/watch_get.go index 53d53d423..3eb4a1be2 100644 --- a/tools/benchmark/cmd/watch_get.go +++ b/tools/benchmark/cmd/watch_get.go @@ -20,6 +20,7 @@ import ( "time" v3 "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/pkg/report" "github.com/spf13/cobra" "golang.org/x/net/context" @@ -70,47 +71,39 @@ func watchGetFunc(cmd *cobra.Command, args []string) { streams[i] = v3.NewWatcher(clients[i%len(clients)]) } - // results from trying to do serialized gets with concurrent watchers - results = make(chan result) - bar = pb.New(watchGetTotalWatchers * watchEvents) bar.Format("Bom !") bar.Start() - pdoneC := printReport(results) - wg.Add(watchGetTotalWatchers) + // report from trying to do serialized gets with concurrent watchers + r := newReport() ctx, cancel := context.WithCancel(context.TODO()) f := func() { - doSerializedGet(ctx, getClient[0], results) + defer close(r.Results()) + for { + st := time.Now() + _, err := getClient[0].Get(ctx, "abc", v3.WithSerializable()) + if ctx.Err() != nil { + break + } + r.Results() <- report.Result{Err: err, Start: st, End: time.Now()} + } } + + wg.Add(watchGetTotalWatchers) for i := 0; i < watchGetTotalWatchers; i++ { go doUnsyncWatch(streams[i%len(streams)], watchRev, f) } + + rc := r.Run() wg.Wait() cancel() bar.Finish() - fmt.Printf("Get during watch summary:\n") - <-pdoneC -} - -func doSerializedGet(ctx context.Context, client *v3.Client, results chan result) { - for { - st := time.Now() - _, err := client.Get(ctx, "abc", v3.WithSerializable()) - if ctx.Err() != nil { - break - } - var errStr string - if err != nil { - errStr = err.Error() - } - res := result{errStr: errStr, duration: time.Since(st), happened: time.Now()} - results <- res - } - close(results) + fmt.Printf("Get during watch summary:\n%s", <-rc) } func doUnsyncWatch(stream v3.Watcher, rev int64, f func()) { + defer wg.Done() wch := stream.Watch(context.TODO(), "watchkey", v3.WithRev(rev)) if wch == nil { panic("could not open watch channel") @@ -122,5 +115,4 @@ func doUnsyncWatch(stream v3.Watcher, rev int64, f func()) { i += len(wev.Events) bar.Add(len(wev.Events)) } - wg.Done() }