benchmark: printSecondSample with time series

This commit is contained in:
Gyu-Ho Lee 2016-03-14 16:53:04 -07:00
parent d1ece7d621
commit 5eefff12e1
5 changed files with 113 additions and 4 deletions

View File

@ -46,6 +46,8 @@ var (
compactInterval time.Duration
compactIndexDelta int64
sample bool
)
func init() {
@ -57,6 +59,7 @@ func init() {
putCmd.Flags().BoolVar(&seqKeys, "sequential-keys", false, "Use sequential keys")
putCmd.Flags().DurationVar(&compactInterval, "compact-interval", 0, `Interval to compact database (do not duplicate this with etcd's 'auto-compaction-retention' flag) (e.g. --compact-interval=5m compacts every 5-minute)`)
putCmd.Flags().Int64Var(&compactIndexDelta, "compact-index-delta", 1000, "Delta between current revision and compact revision (e.g. current revision 10000, compact at 9000)")
putCmd.Flags().BoolVar(&sample, "sample", false, "'true' to sample requests for every second")
}
func putFunc(cmd *cobra.Command, args []string) {
@ -123,7 +126,7 @@ func doPut(ctx context.Context, client v3.KV, requests <-chan v3.Op) {
if err != nil {
errStr = err.Error()
}
results <- result{errStr: errStr, duration: time.Since(st)}
results <- result{errStr: errStr, duration: time.Since(st), happened: time.Now()}
bar.Increment()
}
}

View File

@ -42,6 +42,7 @@ func init() {
RootCmd.AddCommand(rangeCmd)
rangeCmd.Flags().IntVar(&rangeTotal, "total", 10000, "Total number of range requests")
rangeCmd.Flags().StringVar(&rangeConsistency, "consistency", "l", "Linearizable(l) or Serializable(s)")
rangeCmd.Flags().BoolVar(&sample, "sample", false, "'true' to sample requests for every second")
}
func rangeFunc(cmd *cobra.Command, args []string) {
@ -112,7 +113,7 @@ func doRange(client v3.KV, requests <-chan v3.Op) {
if err != nil {
errStr = err.Error()
}
results <- result{errStr: errStr, duration: time.Since(st)}
results <- result{errStr: errStr, duration: time.Since(st), happened: time.Now()}
bar.Increment()
}
}

View File

@ -31,6 +31,7 @@ const (
type result struct {
errStr string
duration time.Duration
happened time.Time
}
type report struct {
@ -46,6 +47,8 @@ type report struct {
errorDist map[string]int
lats []float64
sps *secondPoints
}
func printReport(results chan result) <-chan struct{} {
@ -53,6 +56,7 @@ func printReport(results chan result) <-chan struct{} {
r := &report{
results: results,
errorDist: make(map[string]int),
sps: newSecondPoints(),
}
r.finalize()
r.print()
@ -64,6 +68,7 @@ func printRate(results chan result) <-chan struct{} {
r := &report{
results: results,
errorDist: make(map[string]int),
sps: newSecondPoints(),
}
r.finalize()
fmt.Printf(" Requests/sec:\t%4.4f\n", r.rps)
@ -85,6 +90,7 @@ func (r *report) finalize() {
if res.errStr != "" {
r.errorDist[res.errStr]++
} else {
r.sps.Add(res.happened, res.duration)
r.lats = append(r.lats, res.duration.Seconds())
r.avgTotal += res.duration.Seconds()
}
@ -115,6 +121,9 @@ func (r *report) print() {
fmt.Printf(" Requests/sec:\t%4.4f\n", r.rps)
r.printHistogram()
r.printLatencies()
if sample {
r.printSecondSample()
}
}
if len(r.errorDist) > 0 {
@ -142,6 +151,10 @@ func (r *report) printLatencies() {
}
}
func (r *report) printSecondSample() {
fmt.Println(r.sps.getTimeSeries())
}
func (r *report) printHistogram() {
bc := 10
buckets := make([]float64, bc+1)

View File

@ -0,0 +1,91 @@
// Copyright 2016 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cmd
import (
"bytes"
"fmt"
"sort"
"sync"
"time"
)
type timeSeries struct {
timestamp int64
avgLatency time.Duration
throughPut int64
}
type TimeSeries []timeSeries
func (t TimeSeries) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
func (t TimeSeries) Len() int { return len(t) }
func (t TimeSeries) Less(i, j int) bool { return t[i].timestamp < t[j].timestamp }
type secondPoint struct {
totalLatency time.Duration
count int64
}
type secondPoints struct {
mu sync.Mutex
tm map[int64]secondPoint
}
func newSecondPoints() *secondPoints {
return &secondPoints{tm: make(map[int64]secondPoint)}
}
func (sp *secondPoints) Add(ts time.Time, lat time.Duration) {
sp.mu.Lock()
defer sp.mu.Unlock()
tk := ts.Unix()
if v, ok := sp.tm[tk]; !ok {
sp.tm[tk] = secondPoint{totalLatency: lat, count: 1}
} else {
v.totalLatency += lat
v.count += 1
sp.tm[tk] = v
}
}
func (sp *secondPoints) getTimeSeries() TimeSeries {
sp.mu.Lock()
defer sp.mu.Unlock()
tslice := make(TimeSeries, len(sp.tm))
i := 0
for k, v := range sp.tm {
tslice[i] = timeSeries{
timestamp: k,
avgLatency: time.Duration(v.totalLatency) / time.Duration(v.count),
throughPut: v.count,
}
i++
}
sort.Sort(tslice)
return tslice
}
func (ts TimeSeries) String() string {
buf := new(bytes.Buffer)
buf.WriteString("Sample in one second (unix latency throughput):\n")
for i := range ts {
buf.WriteString(fmt.Sprintf("%7d %10s %5d\n", ts[i].timestamp, ts[i].avgLatency, ts[i].throughPut))
}
buf.WriteString("\n")
return buf.String()
}

View File

@ -80,6 +80,7 @@ func init() {
watchCmd.Flags().IntVar(&watchKeySize, "key-size", 32, "Key size of watch request")
watchCmd.Flags().IntVar(&watchKeySpaceSize, "key-space-size", 1, "Maximum possible keys")
watchCmd.Flags().BoolVar(&watchSeqKeys, "sequential-keys", false, "Use sequential keys")
watchCmd.Flags().BoolVar(&sample, "sample", false, "'true' to sample requests for every second")
}
func watchFunc(cmd *cobra.Command, args []string) {
@ -184,7 +185,7 @@ func doWatch(stream v3.Watcher, requests <-chan string) {
if wch == nil {
errStr = "could not open watch channel"
}
results <- result{errStr: errStr, duration: time.Since(st)}
results <- result{errStr: errStr, duration: time.Since(st), happened: time.Now()}
bar.Increment()
go recvWatchChan(wch)
}
@ -204,7 +205,7 @@ func recvWatchChan(wch v3.WatchChan) {
}
st := time.Now()
results <- result{duration: time.Since(st)}
results <- result{duration: time.Since(st), happened: time.Now()}
bar.Increment()
atomic.AddInt32(&nrRecvCompleted, 1)