reproduce write latency

Signed-off-by: Chao Chen <chaochn@amazon.com>
This commit is contained in:
Chao Chen 2024-06-03 23:27:35 -07:00
parent a0aee63128
commit da66e643a2
2 changed files with 23 additions and 41 deletions

View File

@ -83,7 +83,7 @@ func NewReport(precision string) Report { return newReport(precision) }
func newReport(precision string) *report {
r := &report{
results: make(chan Result, 16),
results: make(chan Result, 65536),
precision: precision,
}
r.stats.ErrorDist = make(map[string]int)

View File

@ -18,11 +18,11 @@ import (
"context"
"fmt"
"os"
"sync/atomic"
"time"
"github.com/cheggaaa/pb/v3"
"github.com/spf13/cobra"
"golang.org/x/time/rate"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/pkg/v3/report"
@ -61,7 +61,7 @@ func init() {
}
func watchLatencyFunc(_ *cobra.Command, _ []string) {
key := string(mustRandBytes(watchLKeySize))
key := "/registry/pods"
value := string(mustRandBytes(watchLValueSize))
wchs := setupWatchChannels(key)
putClient := mustCreateConn()
@ -69,15 +69,8 @@ func watchLatencyFunc(_ *cobra.Command, _ []string) {
bar = pb.New(watchLPutTotal * len(wchs))
bar.Start()
limiter := rate.NewLimiter(rate.Limit(watchLPutRate), watchLPutRate)
putTimes := make([]time.Time, watchLPutTotal)
eventTimes := make([][]time.Time, len(wchs))
for i, wch := range wchs {
for _, wch := range wchs {
wch := wch
i := i
eventTimes[i] = make([]time.Time, watchLPutTotal)
wg.Add(1)
go func() {
defer wg.Done()
@ -85,7 +78,6 @@ func watchLatencyFunc(_ *cobra.Command, _ []string) {
for eventCount < watchLPutTotal {
resp := <-wch
for range resp.Events {
eventTimes[i][eventCount] = time.Now()
eventCount++
bar.Increment()
}
@ -95,40 +87,30 @@ func watchLatencyFunc(_ *cobra.Command, _ []string) {
putReport := newReport()
putReportResults := putReport.Run()
watchReport := newReport()
watchReportResults := watchReport.Run()
for i := 0; i < watchLPutTotal; i++ {
// limit key put as per reqRate
if err := limiter.Wait(context.TODO()); err != nil {
break
}
start := time.Now()
if _, err := putClient.Put(context.TODO(), key, value); err != nil {
fmt.Fprintf(os.Stderr, "Failed to Put for watch latency benchmark: %v\n", err)
os.Exit(1)
}
end := time.Now()
putReport.Results() <- report.Result{Start: start, End: end}
putTimes[i] = end
var putCount atomic.Uint64
for i := 0; i < watchLPutRate; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
if putCount.Load() >= uint64(watchLPutTotal) {
return
}
start := time.Now()
if _, err := putClient.Put(context.TODO(), key, value); err != nil {
fmt.Fprintf(os.Stderr, "Failed to Put for watch latency benchmark: %v\n", err)
}
end := time.Now()
putReport.Results() <- report.Result{Start: start, End: end}
putCount.Add(1)
}
}()
}
wg.Wait()
close(putReport.Results())
bar.Finish()
fmt.Printf("\nPut summary:\n%s", <-putReportResults)
for i := 0; i < len(wchs); i++ {
for j := 0; j < watchLPutTotal; j++ {
start := putTimes[j]
end := eventTimes[i][j]
if end.Before(start) {
start = end
}
watchReport.Results() <- report.Result{Start: start, End: end}
}
}
close(watchReport.Results())
fmt.Printf("\nWatch events summary:\n%s", <-watchReportResults)
}
func setupWatchChannels(key string) []clientv3.WatchChan {