From c09f0ca9d427de324aba9c96051f7f09bcb837b3 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Wed, 17 May 2017 16:05:08 -0700 Subject: [PATCH] report: add NewWeightedReport Reports with weighted results. --- pkg/report/report.go | 114 +++++++++++++++++--------------------- pkg/report/report_test.go | 31 +++++++++++ pkg/report/weighted.go | 101 +++++++++++++++++++++++++++++++++ 3 files changed, 184 insertions(+), 62 deletions(-) create mode 100644 pkg/report/weighted.go diff --git a/pkg/report/report.go b/pkg/report/report.go index 10adcd52d..77e29838a 100644 --- a/pkg/report/report.go +++ b/pkg/report/report.go @@ -30,9 +30,10 @@ const ( // Result describes the timings for an operation. type Result struct { - Start time.Time - End time.Time - Err error + Start time.Time + End time.Time + Err error + Weight float64 } func (res *Result) Duration() time.Duration { return res.End.Sub(res.Start) } @@ -41,18 +42,8 @@ type report struct { results chan Result precision string - avgTotal float64 - fastest float64 - slowest float64 - average float64 - stddev float64 - rps float64 - total time.Duration - - errorDist map[string]int - lats []float64 - - sps *secondPoints + stats Stats + sps *secondPoints } // Stats exposes results raw data. @@ -69,6 +60,13 @@ type Stats struct { TimeSeries TimeSeries } +func (s *Stats) copy() Stats { + ss := *s + ss.ErrorDist = copyMap(ss.ErrorDist) + ss.Lats = copyFloats(ss.Lats) + return ss +} + // Report processes a result stream until it is closed, then produces a // string with information about the consumed result data. type Report interface { @@ -81,12 +79,15 @@ type Report interface { Stats() <-chan Stats } -func NewReport(precision string) Report { - return &report{ +func NewReport(precision string) Report { return newReport(precision) } + +func newReport(precision string) *report { + r := &report{ results: make(chan Result, 16), precision: precision, - errorDist: make(map[string]int), } + r.stats.ErrorDist = make(map[string]int) + return r } func NewReportSample(precision string) Report { @@ -112,22 +113,11 @@ func (r *report) Stats() <-chan Stats { go func() { defer close(donec) r.processResults() - var ts TimeSeries + s := r.stats.copy() if r.sps != nil { - ts = r.sps.getTimeSeries() - } - donec <- Stats{ - AvgTotal: r.avgTotal, - Fastest: r.fastest, - Slowest: r.slowest, - Average: r.average, - Stddev: r.stddev, - RPS: r.rps, - Total: r.total, - ErrorDist: copyMap(r.errorDist), - Lats: copyFloats(r.lats), - TimeSeries: ts, + s.TimeSeries = r.sps.getTimeSeries() } + donec <- s }() return donec } @@ -147,21 +137,21 @@ func copyFloats(s []float64) (c []float64) { } func (r *report) String() (s string) { - if len(r.lats) > 0 { + if len(r.stats.Lats) > 0 { s += fmt.Sprintf("\nSummary:\n") - s += fmt.Sprintf(" Total:\t%s.\n", r.sec2str(r.total.Seconds())) - s += fmt.Sprintf(" Slowest:\t%s.\n", r.sec2str(r.slowest)) - s += fmt.Sprintf(" Fastest:\t%s.\n", r.sec2str(r.fastest)) - s += fmt.Sprintf(" Average:\t%s.\n", r.sec2str(r.average)) - s += fmt.Sprintf(" Stddev:\t%s.\n", r.sec2str(r.stddev)) - s += fmt.Sprintf(" Requests/sec:\t"+r.precision+"\n", r.rps) + s += fmt.Sprintf(" Total:\t%s.\n", r.sec2str(r.stats.Total.Seconds())) + s += fmt.Sprintf(" Slowest:\t%s.\n", r.sec2str(r.stats.Slowest)) + s += fmt.Sprintf(" Fastest:\t%s.\n", r.sec2str(r.stats.Fastest)) + s += fmt.Sprintf(" Average:\t%s.\n", r.sec2str(r.stats.Average)) + s += fmt.Sprintf(" Stddev:\t%s.\n", r.sec2str(r.stats.Stddev)) + s += fmt.Sprintf(" Requests/sec:\t"+r.precision+"\n", r.stats.RPS) s += r.histogram() s += r.sprintLatencies() if r.sps != nil { s += fmt.Sprintf("%v\n", r.sps.getTimeSeries()) } } - if len(r.errorDist) > 0 { + if len(r.stats.ErrorDist) > 0 { s += r.errors() } return s @@ -176,17 +166,17 @@ func NewReportRate(precision string) Report { } func (r *reportRate) String() string { - return fmt.Sprintf(" Requests/sec:\t"+r.precision+"\n", r.rps) + return fmt.Sprintf(" Requests/sec:\t"+r.precision+"\n", r.stats.RPS) } func (r *report) processResult(res *Result) { if res.Err != nil { - r.errorDist[res.Err.Error()]++ + r.stats.ErrorDist[res.Err.Error()]++ return } dur := res.Duration() - r.lats = append(r.lats, dur.Seconds()) - r.avgTotal += dur.Seconds() + r.stats.Lats = append(r.stats.Lats, dur.Seconds()) + r.stats.AvgTotal += dur.Seconds() if r.sps != nil { r.sps.Add(res.Start, dur) } @@ -197,19 +187,19 @@ func (r *report) processResults() { for res := range r.results { r.processResult(&res) } - r.total = time.Since(st) + r.stats.Total = time.Since(st) - r.rps = float64(len(r.lats)) / r.total.Seconds() - r.average = r.avgTotal / float64(len(r.lats)) - for i := range r.lats { - dev := r.lats[i] - r.average - r.stddev += dev * dev + r.stats.RPS = float64(len(r.stats.Lats)) / r.stats.Total.Seconds() + r.stats.Average = r.stats.AvgTotal / float64(len(r.stats.Lats)) + for i := range r.stats.Lats { + dev := r.stats.Lats[i] - r.stats.Average + r.stats.Stddev += dev * dev } - r.stddev = math.Sqrt(r.stddev / float64(len(r.lats))) - sort.Float64s(r.lats) - if len(r.lats) > 0 { - r.fastest = r.lats[0] - r.slowest = r.lats[len(r.lats)-1] + r.stats.Stddev = math.Sqrt(r.stats.Stddev / float64(len(r.stats.Lats))) + sort.Float64s(r.stats.Lats) + if len(r.stats.Lats) > 0 { + r.stats.Fastest = r.stats.Lats[0] + r.stats.Slowest = r.stats.Lats[len(r.stats.Lats)-1] } } @@ -235,7 +225,7 @@ func percentiles(nums []float64) (data []float64) { } func (r *report) sprintLatencies() string { - data := percentiles(r.lats) + data := percentiles(r.stats.Lats) s := fmt.Sprintf("\nLatency distribution:\n") for i := 0; i < len(pctls); i++ { if data[i] > 0 { @@ -249,15 +239,15 @@ func (r *report) histogram() string { bc := 10 buckets := make([]float64, bc+1) counts := make([]int, bc+1) - bs := (r.slowest - r.fastest) / float64(bc) + bs := (r.stats.Slowest - r.stats.Fastest) / float64(bc) for i := 0; i < bc; i++ { - buckets[i] = r.fastest + bs*float64(i) + buckets[i] = r.stats.Fastest + bs*float64(i) } - buckets[bc] = r.slowest + buckets[bc] = r.stats.Slowest var bi int var max int - for i := 0; i < len(r.lats); { - if r.lats[i] <= buckets[bi] { + for i := 0; i < len(r.stats.Lats); { + if r.stats.Lats[i] <= buckets[bi] { i++ counts[bi]++ if max < counts[bi] { @@ -281,7 +271,7 @@ func (r *report) histogram() string { func (r *report) errors() string { s := fmt.Sprintf("\nError distribution:\n") - for err, num := range r.errorDist { + for err, num := range r.stats.ErrorDist { s += fmt.Sprintf(" [%d]\t%s\n", num, err) } return s diff --git a/pkg/report/report_test.go b/pkg/report/report_test.go index b56183e4a..6f073f3e8 100644 --- a/pkg/report/report_test.go +++ b/pkg/report/report_test.go @@ -81,3 +81,34 @@ func TestReport(t *testing.T) { } } } + +func TestWeightedReport(t *testing.T) { + r := NewWeightedReport(NewReport("%f"), "%f") + go func() { + start := time.Now() + for i := 0; i < 5; i++ { + end := start.Add(time.Second) + r.Results() <- Result{Start: start, End: end, Weight: 2.0} + start = end + } + r.Results() <- Result{Start: start, End: start.Add(time.Second), Err: fmt.Errorf("oops")} + close(r.Results()) + }() + + stats := <-r.Stats() + stats.TimeSeries = nil // ignore timeseries since it uses wall clock + wStats := Stats{ + AvgTotal: 10.0, + Fastest: 0.5, + Slowest: 0.5, + Average: 0.5, + Stddev: 0.0, + Total: stats.Total, + RPS: 10.0 / stats.Total.Seconds(), + ErrorDist: map[string]int{"oops": 1}, + Lats: []float64{0.5, 0.5, 0.5, 0.5, 0.5}, + } + if !reflect.DeepEqual(stats, wStats) { + t.Fatalf("got %+v, want %+v", stats, wStats) + } +} diff --git a/pkg/report/weighted.go b/pkg/report/weighted.go new file mode 100644 index 000000000..411214f6d --- /dev/null +++ b/pkg/report/weighted.go @@ -0,0 +1,101 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// the file is borrowed from github.com/rakyll/boom/boomer/print.go + +package report + +import ( + "time" +) + +type weightedReport struct { + baseReport Report + + report *report + results chan Result + weightTotal float64 +} + +// NewWeightedReport returns a report that includes +// both weighted and unweighted statistics. +func NewWeightedReport(r Report, precision string) Report { + return &weightedReport{ + baseReport: r, + report: newReport(precision), + results: make(chan Result, 16), + } +} + +func (wr *weightedReport) Results() chan<- Result { return wr.results } + +func (wr *weightedReport) Run() <-chan string { + donec := make(chan string, 2) + go func() { + defer close(donec) + basec, rc := make(chan string, 1), make(chan Stats, 1) + go func() { basec <- (<-wr.baseReport.Run()) }() + go func() { rc <- (<-wr.report.Stats()) }() + go wr.processResults() + wr.report.stats = wr.reweighStat(<-rc) + donec <- wr.report.String() + donec <- (<-basec) + }() + return donec +} + +func (wr *weightedReport) Stats() <-chan Stats { + donec := make(chan Stats, 2) + go func() { + defer close(donec) + basec, rc := make(chan Stats, 1), make(chan Stats, 1) + go func() { basec <- (<-wr.baseReport.Stats()) }() + go func() { rc <- (<-wr.report.Stats()) }() + go wr.processResults() + donec <- wr.reweighStat(<-rc) + donec <- (<-basec) + }() + return donec +} + +func (wr *weightedReport) processResults() { + defer close(wr.report.results) + defer close(wr.baseReport.Results()) + for res := range wr.results { + wr.processResult(res) + wr.baseReport.Results() <- res + } +} + +func (wr *weightedReport) processResult(res Result) { + if res.Err != nil { + wr.report.results <- res + return + } + if res.Weight == 0 { + res.Weight = 1.0 + } + wr.weightTotal += res.Weight + res.End = res.Start.Add(time.Duration(float64(res.End.Sub(res.Start)) / res.Weight)) + res.Weight = 1.0 + wr.report.results <- res +} + +func (wr *weightedReport) reweighStat(s Stats) Stats { + weightCoef := wr.weightTotal / float64(len(s.Lats)) + // weight > 1 => processing more than one request + s.RPS *= weightCoef + s.AvgTotal *= weightCoef * weightCoef + return s +}