Merge da66e643a2011f961298478fdbe4d6d1f6ef75f0 into c86c93ca2951338115159dcdd20711603044e1f1

This commit is contained in:
chao 2024-09-26 22:00:05 +01:00 committed by GitHub
commit 77ba14707c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 23 additions and 41 deletions

View File

@ -83,7 +83,7 @@ func NewReport(precision string) Report { return newReport(precision) }
func newReport(precision string) *report { func newReport(precision string) *report {
r := &report{ r := &report{
results: make(chan Result, 16), results: make(chan Result, 65536),
precision: precision, precision: precision,
} }
r.stats.ErrorDist = make(map[string]int) r.stats.ErrorDist = make(map[string]int)

View File

@ -18,11 +18,11 @@ import (
"context" "context"
"fmt" "fmt"
"os" "os"
"sync/atomic"
"time" "time"
"github.com/cheggaaa/pb/v3" "github.com/cheggaaa/pb/v3"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"golang.org/x/time/rate"
clientv3 "go.etcd.io/etcd/client/v3" clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/pkg/v3/report" "go.etcd.io/etcd/pkg/v3/report"
@ -61,7 +61,7 @@ func init() {
} }
func watchLatencyFunc(_ *cobra.Command, _ []string) { func watchLatencyFunc(_ *cobra.Command, _ []string) {
key := string(mustRandBytes(watchLKeySize)) key := "/registry/pods"
value := string(mustRandBytes(watchLValueSize)) value := string(mustRandBytes(watchLValueSize))
wchs := setupWatchChannels(key) wchs := setupWatchChannels(key)
putClient := mustCreateConn() putClient := mustCreateConn()
@ -69,15 +69,8 @@ func watchLatencyFunc(_ *cobra.Command, _ []string) {
bar = pb.New(watchLPutTotal * len(wchs)) bar = pb.New(watchLPutTotal * len(wchs))
bar.Start() bar.Start()
limiter := rate.NewLimiter(rate.Limit(watchLPutRate), watchLPutRate) for _, wch := range wchs {
putTimes := make([]time.Time, watchLPutTotal)
eventTimes := make([][]time.Time, len(wchs))
for i, wch := range wchs {
wch := wch wch := wch
i := i
eventTimes[i] = make([]time.Time, watchLPutTotal)
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
@ -85,7 +78,6 @@ func watchLatencyFunc(_ *cobra.Command, _ []string) {
for eventCount < watchLPutTotal { for eventCount < watchLPutTotal {
resp := <-wch resp := <-wch
for range resp.Events { for range resp.Events {
eventTimes[i][eventCount] = time.Now()
eventCount++ eventCount++
bar.Increment() bar.Increment()
} }
@ -95,40 +87,30 @@ func watchLatencyFunc(_ *cobra.Command, _ []string) {
putReport := newReport() putReport := newReport()
putReportResults := putReport.Run() putReportResults := putReport.Run()
watchReport := newReport()
watchReportResults := watchReport.Run() var putCount atomic.Uint64
for i := 0; i < watchLPutTotal; i++ { for i := 0; i < watchLPutRate; i++ {
// limit key put as per reqRate wg.Add(1)
if err := limiter.Wait(context.TODO()); err != nil { go func() {
break defer wg.Done()
for {
if putCount.Load() >= uint64(watchLPutTotal) {
return
} }
start := time.Now() start := time.Now()
if _, err := putClient.Put(context.TODO(), key, value); err != nil { if _, err := putClient.Put(context.TODO(), key, value); err != nil {
fmt.Fprintf(os.Stderr, "Failed to Put for watch latency benchmark: %v\n", err) fmt.Fprintf(os.Stderr, "Failed to Put for watch latency benchmark: %v\n", err)
os.Exit(1)
} }
end := time.Now() end := time.Now()
putReport.Results() <- report.Result{Start: start, End: end} putReport.Results() <- report.Result{Start: start, End: end}
putTimes[i] = end putCount.Add(1)
}
}()
} }
wg.Wait() wg.Wait()
close(putReport.Results()) close(putReport.Results())
bar.Finish() bar.Finish()
fmt.Printf("\nPut summary:\n%s", <-putReportResults) fmt.Printf("\nPut summary:\n%s", <-putReportResults)
for i := 0; i < len(wchs); i++ {
for j := 0; j < watchLPutTotal; j++ {
start := putTimes[j]
end := eventTimes[i][j]
if end.Before(start) {
start = end
}
watchReport.Results() <- report.Result{Start: start, End: end}
}
}
close(watchReport.Results())
fmt.Printf("\nWatch events summary:\n%s", <-watchReportResults)
} }
func setupWatchChannels(key string) []clientv3.WatchChan { func setupWatchChannels(key string) []clientv3.WatchChan {