diff --git a/tools/benchmark/cmd/put.go b/tools/benchmark/cmd/put.go index f1d5aed0b..bd7443fee 100644 --- a/tools/benchmark/cmd/put.go +++ b/tools/benchmark/cmd/put.go @@ -46,6 +46,8 @@ var ( compactInterval time.Duration compactIndexDelta int64 + + sample bool ) func init() { @@ -57,6 +59,7 @@ func init() { putCmd.Flags().BoolVar(&seqKeys, "sequential-keys", false, "Use sequential keys") putCmd.Flags().DurationVar(&compactInterval, "compact-interval", 0, `Interval to compact database (do not duplicate this with etcd's 'auto-compaction-retention' flag) (e.g. --compact-interval=5m compacts every 5-minute)`) putCmd.Flags().Int64Var(&compactIndexDelta, "compact-index-delta", 1000, "Delta between current revision and compact revision (e.g. current revision 10000, compact at 9000)") + putCmd.Flags().BoolVar(&sample, "sample", false, "'true' to sample requests for every second") } func putFunc(cmd *cobra.Command, args []string) { @@ -123,7 +126,7 @@ func doPut(ctx context.Context, client v3.KV, requests <-chan v3.Op) { if err != nil { errStr = err.Error() } - results <- result{errStr: errStr, duration: time.Since(st)} + results <- result{errStr: errStr, duration: time.Since(st), happened: time.Now()} bar.Increment() } } diff --git a/tools/benchmark/cmd/range.go b/tools/benchmark/cmd/range.go index 3471e2f43..17fffed32 100644 --- a/tools/benchmark/cmd/range.go +++ b/tools/benchmark/cmd/range.go @@ -42,6 +42,7 @@ func init() { RootCmd.AddCommand(rangeCmd) rangeCmd.Flags().IntVar(&rangeTotal, "total", 10000, "Total number of range requests") rangeCmd.Flags().StringVar(&rangeConsistency, "consistency", "l", "Linearizable(l) or Serializable(s)") + rangeCmd.Flags().BoolVar(&sample, "sample", false, "'true' to sample requests for every second") } func rangeFunc(cmd *cobra.Command, args []string) { @@ -112,7 +113,7 @@ func doRange(client v3.KV, requests <-chan v3.Op) { if err != nil { errStr = err.Error() } - results <- result{errStr: errStr, duration: time.Since(st)} + results <- result{errStr: errStr, duration: time.Since(st), happened: time.Now()} bar.Increment() } } diff --git a/tools/benchmark/cmd/report.go b/tools/benchmark/cmd/report.go index 61482c056..18c601689 100644 --- a/tools/benchmark/cmd/report.go +++ b/tools/benchmark/cmd/report.go @@ -31,6 +31,7 @@ const ( type result struct { errStr string duration time.Duration + happened time.Time } type report struct { @@ -46,6 +47,8 @@ type report struct { errorDist map[string]int lats []float64 + + sps *secondPoints } func printReport(results chan result) <-chan struct{} { @@ -53,6 +56,7 @@ func printReport(results chan result) <-chan struct{} { r := &report{ results: results, errorDist: make(map[string]int), + sps: newSecondPoints(), } r.finalize() r.print() @@ -64,6 +68,7 @@ func printRate(results chan result) <-chan struct{} { r := &report{ results: results, errorDist: make(map[string]int), + sps: newSecondPoints(), } r.finalize() fmt.Printf(" Requests/sec:\t%4.4f\n", r.rps) @@ -85,6 +90,7 @@ func (r *report) finalize() { if res.errStr != "" { r.errorDist[res.errStr]++ } else { + r.sps.Add(res.happened, res.duration) r.lats = append(r.lats, res.duration.Seconds()) r.avgTotal += res.duration.Seconds() } @@ -115,6 +121,9 @@ func (r *report) print() { fmt.Printf(" Requests/sec:\t%4.4f\n", r.rps) r.printHistogram() r.printLatencies() + if sample { + r.printSecondSample() + } } if len(r.errorDist) > 0 { @@ -142,6 +151,10 @@ func (r *report) printLatencies() { } } +func (r *report) printSecondSample() { + fmt.Println(r.sps.getTimeSeries()) +} + func (r *report) printHistogram() { bc := 10 buckets := make([]float64, bc+1) diff --git a/tools/benchmark/cmd/timeseries.go b/tools/benchmark/cmd/timeseries.go new file mode 100644 index 000000000..e727787d1 --- /dev/null +++ b/tools/benchmark/cmd/timeseries.go @@ -0,0 +1,91 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cmd + +import ( + "bytes" + "fmt" + "sort" + "sync" + "time" +) + +type timeSeries struct { + timestamp int64 + avgLatency time.Duration + throughPut int64 +} + +type TimeSeries []timeSeries + +func (t TimeSeries) Swap(i, j int) { t[i], t[j] = t[j], t[i] } +func (t TimeSeries) Len() int { return len(t) } +func (t TimeSeries) Less(i, j int) bool { return t[i].timestamp < t[j].timestamp } + +type secondPoint struct { + totalLatency time.Duration + count int64 +} + +type secondPoints struct { + mu sync.Mutex + tm map[int64]secondPoint +} + +func newSecondPoints() *secondPoints { + return &secondPoints{tm: make(map[int64]secondPoint)} +} + +func (sp *secondPoints) Add(ts time.Time, lat time.Duration) { + sp.mu.Lock() + defer sp.mu.Unlock() + + tk := ts.Unix() + if v, ok := sp.tm[tk]; !ok { + sp.tm[tk] = secondPoint{totalLatency: lat, count: 1} + } else { + v.totalLatency += lat + v.count += 1 + sp.tm[tk] = v + } +} + +func (sp *secondPoints) getTimeSeries() TimeSeries { + sp.mu.Lock() + defer sp.mu.Unlock() + + tslice := make(TimeSeries, len(sp.tm)) + i := 0 + for k, v := range sp.tm { + tslice[i] = timeSeries{ + timestamp: k, + avgLatency: time.Duration(v.totalLatency) / time.Duration(v.count), + throughPut: v.count, + } + i++ + } + sort.Sort(tslice) + return tslice +} + +func (ts TimeSeries) String() string { + buf := new(bytes.Buffer) + buf.WriteString("Sample in one second (unix latency throughput):\n") + for i := range ts { + buf.WriteString(fmt.Sprintf("%7d %10s %5d\n", ts[i].timestamp, ts[i].avgLatency, ts[i].throughPut)) + } + buf.WriteString("\n") + return buf.String() +} diff --git a/tools/benchmark/cmd/watch.go b/tools/benchmark/cmd/watch.go index 6bc83ec00..66d3eb3a9 100644 --- a/tools/benchmark/cmd/watch.go +++ b/tools/benchmark/cmd/watch.go @@ -80,6 +80,7 @@ func init() { watchCmd.Flags().IntVar(&watchKeySize, "key-size", 32, "Key size of watch request") watchCmd.Flags().IntVar(&watchKeySpaceSize, "key-space-size", 1, "Maximum possible keys") watchCmd.Flags().BoolVar(&watchSeqKeys, "sequential-keys", false, "Use sequential keys") + watchCmd.Flags().BoolVar(&sample, "sample", false, "'true' to sample requests for every second") } func watchFunc(cmd *cobra.Command, args []string) { @@ -184,7 +185,7 @@ func doWatch(stream v3.Watcher, requests <-chan string) { if wch == nil { errStr = "could not open watch channel" } - results <- result{errStr: errStr, duration: time.Since(st)} + results <- result{errStr: errStr, duration: time.Since(st), happened: time.Now()} bar.Increment() go recvWatchChan(wch) } @@ -204,7 +205,7 @@ func recvWatchChan(wch v3.WatchChan) { } st := time.Now() - results <- result{duration: time.Since(st)} + results <- result{duration: time.Since(st), happened: time.Now()} bar.Increment() atomic.AddInt32(&nrRecvCompleted, 1)