mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
benchmark: refactor watch benchmark
This commit is contained in:
parent
55de54a757
commit
6171334595
@ -15,6 +15,7 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
@ -22,11 +23,11 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
v3 "github.com/coreos/etcd/clientv3"
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
"github.com/coreos/etcd/pkg/report"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
"golang.org/x/net/context"
|
||||
"golang.org/x/time/rate"
|
||||
"gopkg.in/cheggaaa/pb.v1"
|
||||
)
|
||||
|
||||
@ -50,9 +51,9 @@ Each key is watched by (--total/--watched-key-total) watchers.
|
||||
}
|
||||
|
||||
var (
|
||||
watchTotalStreams int
|
||||
watchTotal int
|
||||
watchedKeyTotal int
|
||||
watchStreams int
|
||||
watchWatchesPerStream int
|
||||
watchedKeyTotal int
|
||||
|
||||
watchPutRate int
|
||||
watchPutTotal int
|
||||
@ -60,23 +61,27 @@ var (
|
||||
watchKeySize int
|
||||
watchKeySpaceSize int
|
||||
watchSeqKeys bool
|
||||
|
||||
eventsTotal int
|
||||
|
||||
nrWatchCompleted int32
|
||||
nrRecvCompleted int32
|
||||
watchCompletedNotifier chan struct{}
|
||||
recvCompletedNotifier chan struct{}
|
||||
)
|
||||
|
||||
type watchedKeys struct {
|
||||
watched []string
|
||||
numWatchers map[string]int
|
||||
|
||||
watches []clientv3.WatchChan
|
||||
|
||||
// ctx to control all watches
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
func init() {
|
||||
RootCmd.AddCommand(watchCmd)
|
||||
watchCmd.Flags().IntVar(&watchTotalStreams, "watchers", 10000, "Total number of watchers")
|
||||
watchCmd.Flags().IntVar(&watchTotal, "total", 100000, "Total number of watch requests")
|
||||
watchCmd.Flags().IntVar(&watchedKeyTotal, "watched-key-total", 10000, "Total number of keys to be watched")
|
||||
watchCmd.Flags().IntVar(&watchStreams, "streams", 10, "Total watch streams")
|
||||
watchCmd.Flags().IntVar(&watchWatchesPerStream, "watch-per-stream", 100, "Total watchers per stream")
|
||||
watchCmd.Flags().IntVar(&watchedKeyTotal, "watched-key-total", 1, "Total number of keys to be watched")
|
||||
|
||||
watchCmd.Flags().IntVar(&watchPutRate, "put-rate", 100, "Number of keys to put per second")
|
||||
watchCmd.Flags().IntVar(&watchPutTotal, "put-total", 10000, "Number of put requests")
|
||||
watchCmd.Flags().IntVar(&watchPutRate, "put-rate", 0, "Number of keys to put per second")
|
||||
watchCmd.Flags().IntVar(&watchPutTotal, "put-total", 1000, "Number of put requests")
|
||||
|
||||
watchCmd.Flags().IntVar(&watchKeySize, "key-size", 32, "Key size of watch request")
|
||||
watchCmd.Flags().IntVar(&watchKeySpaceSize, "key-space-size", 1, "Maximum possible keys")
|
||||
@ -88,9 +93,76 @@ func watchFunc(cmd *cobra.Command, args []string) {
|
||||
fmt.Fprintf(os.Stderr, "expected positive --key-space-size, got (%v)", watchKeySpaceSize)
|
||||
os.Exit(1)
|
||||
}
|
||||
grpcConns := int(totalClients)
|
||||
if totalClients > totalConns {
|
||||
grpcConns = int(totalConns)
|
||||
}
|
||||
wantedConns := 1 + (watchStreams / 100)
|
||||
if grpcConns < wantedConns {
|
||||
fmt.Fprintf(os.Stderr, "warning: grpc limits 100 streams per client connection, have %d but need %d\n", grpcConns, wantedConns)
|
||||
}
|
||||
clients := mustCreateClients(totalClients, totalConns)
|
||||
wk := newWatchedKeys()
|
||||
benchMakeWatches(clients, wk)
|
||||
benchPutWatches(clients, wk)
|
||||
}
|
||||
|
||||
func benchMakeWatches(clients []*clientv3.Client, wk *watchedKeys) {
|
||||
streams := make([]clientv3.Watcher, watchStreams)
|
||||
for i := range streams {
|
||||
streams[i] = clientv3.NewWatcher(clients[i%len(clients)])
|
||||
}
|
||||
|
||||
keyc := make(chan string, watchStreams)
|
||||
bar = pb.New(watchStreams * watchWatchesPerStream)
|
||||
bar.Format("Bom !")
|
||||
bar.Start()
|
||||
|
||||
r := newReport()
|
||||
rch := r.Results()
|
||||
|
||||
wg.Add(len(streams) + 1)
|
||||
wc := make(chan []clientv3.WatchChan, len(streams))
|
||||
for _, s := range streams {
|
||||
go func(s clientv3.Watcher) {
|
||||
defer wg.Done()
|
||||
var ws []clientv3.WatchChan
|
||||
for i := 0; i < watchWatchesPerStream; i++ {
|
||||
k := <-keyc
|
||||
st := time.Now()
|
||||
wch := s.Watch(wk.ctx, k)
|
||||
rch <- report.Result{Start: st, End: time.Now()}
|
||||
ws = append(ws, wch)
|
||||
bar.Increment()
|
||||
}
|
||||
wc <- ws
|
||||
}(s)
|
||||
}
|
||||
go func() {
|
||||
defer func() {
|
||||
close(keyc)
|
||||
wg.Done()
|
||||
}()
|
||||
for i := 0; i < watchStreams*watchWatchesPerStream; i++ {
|
||||
key := wk.watched[i%len(wk.watched)]
|
||||
keyc <- key
|
||||
wk.numWatchers[key]++
|
||||
}
|
||||
}()
|
||||
|
||||
rc := r.Run()
|
||||
wg.Wait()
|
||||
bar.Finish()
|
||||
close(r.Results())
|
||||
fmt.Printf("Watch creation summary:\n%s", <-rc)
|
||||
|
||||
for i := 0; i < len(streams); i++ {
|
||||
wk.watches = append(wk.watches, (<-wc)...)
|
||||
}
|
||||
}
|
||||
|
||||
func newWatchedKeys() *watchedKeys {
|
||||
watched := make([]string, watchedKeyTotal)
|
||||
numWatchers := make(map[string]int)
|
||||
for i := range watched {
|
||||
k := make([]byte, watchKeySize)
|
||||
if watchSeqKeys {
|
||||
@ -100,112 +172,76 @@ func watchFunc(cmd *cobra.Command, args []string) {
|
||||
}
|
||||
watched[i] = string(k)
|
||||
}
|
||||
|
||||
requests := make(chan string, totalClients)
|
||||
|
||||
clients := mustCreateClients(totalClients, totalConns)
|
||||
|
||||
streams := make([]v3.Watcher, watchTotalStreams)
|
||||
for i := range streams {
|
||||
streams[i] = v3.NewWatcher(clients[i%len(clients)])
|
||||
ctx, cancel := context.WithCancel(context.TODO())
|
||||
return &watchedKeys{
|
||||
watched: watched,
|
||||
numWatchers: make(map[string]int),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
}
|
||||
}
|
||||
|
||||
// watching phase
|
||||
bar = pb.New(watchTotal)
|
||||
bar.Format("Bom !")
|
||||
bar.Start()
|
||||
|
||||
atomic.StoreInt32(&nrWatchCompleted, int32(0))
|
||||
watchCompletedNotifier = make(chan struct{})
|
||||
|
||||
r := report.NewReportRate("%4.4f")
|
||||
for i := range streams {
|
||||
go doWatch(streams[i], requests, r.Results())
|
||||
}
|
||||
|
||||
go func() {
|
||||
for i := 0; i < watchTotal; i++ {
|
||||
key := watched[i%len(watched)]
|
||||
requests <- key
|
||||
numWatchers[key]++
|
||||
}
|
||||
close(requests)
|
||||
}()
|
||||
|
||||
rc := r.Run()
|
||||
<-watchCompletedNotifier
|
||||
bar.Finish()
|
||||
close(r.Results())
|
||||
fmt.Printf("Watch creation summary:\n%s", <-rc)
|
||||
|
||||
// put phase
|
||||
eventsTotal = 0
|
||||
func benchPutWatches(clients []*clientv3.Client, wk *watchedKeys) {
|
||||
eventsTotal := 0
|
||||
for i := 0; i < watchPutTotal; i++ {
|
||||
eventsTotal += numWatchers[watched[i%len(watched)]]
|
||||
eventsTotal += wk.numWatchers[wk.watched[i%len(wk.watched)]]
|
||||
}
|
||||
|
||||
bar = pb.New(eventsTotal)
|
||||
bar.Format("Bom !")
|
||||
bar.Start()
|
||||
|
||||
atomic.StoreInt32(&nrRecvCompleted, 0)
|
||||
recvCompletedNotifier = make(chan struct{})
|
||||
putreqc := make(chan v3.Op)
|
||||
r := newReport()
|
||||
|
||||
r = report.NewReportRate("%4.4f")
|
||||
for i := 0; i < watchPutTotal; i++ {
|
||||
go func(c *v3.Client) {
|
||||
for op := range putreqc {
|
||||
if _, err := c.Do(context.TODO(), op); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "failed to Put for watch benchmark: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
}(clients[i%len(clients)])
|
||||
wg.Add(len(wk.watches))
|
||||
nrRxed := int32(eventsTotal)
|
||||
for _, w := range wk.watches {
|
||||
go func(wc clientv3.WatchChan) {
|
||||
defer wg.Done()
|
||||
recvWatchChan(wc, r.Results(), &nrRxed)
|
||||
wk.cancel()
|
||||
}(w)
|
||||
}
|
||||
|
||||
putreqc := make(chan clientv3.Op, len(clients))
|
||||
go func() {
|
||||
defer close(putreqc)
|
||||
for i := 0; i < watchPutTotal; i++ {
|
||||
putreqc <- v3.OpPut(watched[i%(len(watched))], "data")
|
||||
// TODO: use a real rate-limiter instead of sleep.
|
||||
time.Sleep(time.Second / time.Duration(watchPutRate))
|
||||
putreqc <- clientv3.OpPut(wk.watched[i%(len(wk.watched))], "data")
|
||||
}
|
||||
close(putreqc)
|
||||
}()
|
||||
|
||||
rc = r.Run()
|
||||
<-recvCompletedNotifier
|
||||
limit := rate.NewLimiter(rate.Limit(watchPutRate), 1)
|
||||
for _, cc := range clients {
|
||||
go func(c *clientv3.Client) {
|
||||
for op := range putreqc {
|
||||
if err := limit.Wait(context.TODO()); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if _, err := c.Do(context.TODO(), op); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
}(cc)
|
||||
}
|
||||
|
||||
rc := r.Run()
|
||||
wg.Wait()
|
||||
bar.Finish()
|
||||
close(r.Results())
|
||||
fmt.Printf("Watch events received summary:\n%s", <-rc)
|
||||
|
||||
}
|
||||
|
||||
func doWatch(stream v3.Watcher, requests <-chan string, results chan<- report.Result) {
|
||||
for r := range requests {
|
||||
st := time.Now()
|
||||
wch := stream.Watch(context.TODO(), r)
|
||||
results <- report.Result{Start: st, End: time.Now()}
|
||||
bar.Increment()
|
||||
go recvWatchChan(wch, results)
|
||||
}
|
||||
atomic.AddInt32(&nrWatchCompleted, 1)
|
||||
if atomic.LoadInt32(&nrWatchCompleted) == int32(watchTotalStreams) {
|
||||
watchCompletedNotifier <- struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
func recvWatchChan(wch v3.WatchChan, results chan<- report.Result) {
|
||||
func recvWatchChan(wch clientv3.WatchChan, results chan<- report.Result, nrRxed *int32) {
|
||||
for r := range wch {
|
||||
st := time.Now()
|
||||
for range r.Events {
|
||||
results <- report.Result{Start: st, End: time.Now()}
|
||||
bar.Increment()
|
||||
atomic.AddInt32(&nrRecvCompleted, 1)
|
||||
}
|
||||
|
||||
if atomic.LoadInt32(&nrRecvCompleted) == int32(eventsTotal) {
|
||||
recvCompletedNotifier <- struct{}{}
|
||||
break
|
||||
if atomic.AddInt32(nrRxed, -1) <= 0 {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user