From 298c1e2487478912af1b7c224da98ba69b96e4a0 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Thu, 25 Feb 2016 13:09:34 -0800 Subject: [PATCH] tools/benchmark: port to clientv3 API --- tools/benchmark/cmd/put.go | 16 +++++----- tools/benchmark/cmd/range.go | 23 ++++++++------- tools/benchmark/cmd/watch.go | 57 ++++++++++++++---------------------- 3 files changed, 42 insertions(+), 54 deletions(-) diff --git a/tools/benchmark/cmd/put.go b/tools/benchmark/cmd/put.go index aa0904e44..85c8792c7 100644 --- a/tools/benchmark/cmd/put.go +++ b/tools/benchmark/cmd/put.go @@ -24,7 +24,7 @@ import ( "github.com/coreos/etcd/Godeps/_workspace/src/github.com/cheggaaa/pb" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" - "github.com/coreos/etcd/etcdserver/etcdserverpb" + v3 "github.com/coreos/etcd/clientv3" ) // putCmd represents the put command @@ -61,10 +61,10 @@ func putFunc(cmd *cobra.Command, args []string) { } results = make(chan result) - requests := make(chan etcdserverpb.PutRequest, totalClients) + requests := make(chan v3.Op, totalClients) bar = pb.New(putTotal) - k, v := make([]byte, keySize), mustRandBytes(valSize) + k, v := make([]byte, keySize), string(mustRandBytes(valSize)) clients := mustCreateClients(totalClients, totalConns) @@ -73,7 +73,7 @@ func putFunc(cmd *cobra.Command, args []string) { for i := range clients { wg.Add(1) - go doPut(context.Background(), clients[i].KV, requests) + go doPut(context.Background(), clients[i], requests) } pdoneC := printReport(results) @@ -85,7 +85,7 @@ func putFunc(cmd *cobra.Command, args []string) { } else { binary.PutVarint(k, int64(rand.Intn(keySpaceSize))) } - requests <- etcdserverpb.PutRequest{Key: k, Value: v} + requests <- v3.OpPut(string(k), v) } close(requests) }() @@ -98,12 +98,12 @@ func putFunc(cmd *cobra.Command, args []string) { <-pdoneC } -func doPut(ctx context.Context, client etcdserverpb.KVClient, requests <-chan etcdserverpb.PutRequest) { +func doPut(ctx context.Context, client v3.KV, requests <-chan v3.Op) { defer wg.Done() - for r := range requests { + for op := range requests { st := time.Now() - _, err := client.Put(ctx, &r) + _, err := client.Do(ctx, op) var errStr string if err != nil { diff --git a/tools/benchmark/cmd/range.go b/tools/benchmark/cmd/range.go index da04720dd..3471e2f43 100644 --- a/tools/benchmark/cmd/range.go +++ b/tools/benchmark/cmd/range.go @@ -22,7 +22,7 @@ import ( "github.com/coreos/etcd/Godeps/_workspace/src/github.com/cheggaaa/pb" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" - "github.com/coreos/etcd/etcdserver/etcdserverpb" + v3 "github.com/coreos/etcd/clientv3" ) // rangeCmd represents the range command @@ -50,10 +50,10 @@ func rangeFunc(cmd *cobra.Command, args []string) { os.Exit(1) } - k := []byte(args[0]) - var end []byte + k := args[0] + end := "" if len(args) == 2 { - end = []byte(args[1]) + end = args[1] } if rangeConsistency == "l" { @@ -66,7 +66,7 @@ func rangeFunc(cmd *cobra.Command, args []string) { } results = make(chan result) - requests := make(chan etcdserverpb.RangeRequest, totalClients) + requests := make(chan v3.Op, totalClients) bar = pb.New(rangeTotal) clients := mustCreateClients(totalClients, totalConns) @@ -83,11 +83,12 @@ func rangeFunc(cmd *cobra.Command, args []string) { go func() { for i := 0; i < rangeTotal; i++ { - r := etcdserverpb.RangeRequest{Key: k, RangeEnd: end} + opts := []v3.OpOption{v3.WithRange(end)} if rangeConsistency == "s" { - r.Serializable = true + opts = append(opts, v3.WithSerializable()) } - requests <- r + op := v3.OpGet(k, opts...) + requests <- op } close(requests) }() @@ -100,12 +101,12 @@ func rangeFunc(cmd *cobra.Command, args []string) { <-pdoneC } -func doRange(client etcdserverpb.KVClient, requests <-chan etcdserverpb.RangeRequest) { +func doRange(client v3.KV, requests <-chan v3.Op) { defer wg.Done() - for req := range requests { + for op := range requests { st := time.Now() - _, err := client.Range(context.Background(), &req) + _, err := client.Do(context.Background(), op) var errStr string if err != nil { diff --git a/tools/benchmark/cmd/watch.go b/tools/benchmark/cmd/watch.go index 1a73f33c0..d80a6d4b7 100644 --- a/tools/benchmark/cmd/watch.go +++ b/tools/benchmark/cmd/watch.go @@ -20,7 +20,7 @@ import ( "sync/atomic" "time" - "github.com/coreos/etcd/etcdserver/etcdserverpb" + v3 "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/cheggaaa/pb" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra" @@ -73,23 +73,18 @@ func init() { } func watchFunc(cmd *cobra.Command, args []string) { - watched := make([][]byte, watchedKeyTotal) + watched := make([]string, watchedKeyTotal) for i := range watched { - watched[i] = mustRandBytes(32) + watched[i] = string(mustRandBytes(32)) } - requests := make(chan etcdserverpb.WatchRequest, totalClients) + requests := make(chan string, totalClients) clients := mustCreateClients(totalClients, totalConns) - streams := make([]etcdserverpb.Watch_WatchClient, watchTotalStreams) - var err error + streams := make([]v3.Watcher, watchTotalStreams) for i := range streams { - streams[i], err = clients[i%len(clients)].Watch.Watch(context.TODO()) - if err != nil { - fmt.Fprintln(os.Stderr, "Failed to create watch stream:", err) - os.Exit(1) - } + streams[i] = v3.NewWatcher(clients[i%len(clients)]) } putStartNotifier = make(chan struct{}) @@ -111,10 +106,7 @@ func watchFunc(cmd *cobra.Command, args []string) { go func() { for i := 0; i < watchTotal; i++ { - requests <- etcdserverpb.WatchRequest{ - RequestUnion: &etcdserverpb.WatchRequest_CreateRequest{ - CreateRequest: &etcdserverpb.WatchCreateRequest{ - Key: watched[i%(len(watched))]}}} + requests <- watched[i%len(watched)] } close(requests) }() @@ -139,7 +131,7 @@ func watchFunc(cmd *cobra.Command, args []string) { recvCompletedNotifier = make(chan struct{}) close(putStartNotifier) - putreqc := make(chan etcdserverpb.PutRequest) + putreqc := make(chan v3.Op) for i := 0; i < watchPutTotal; i++ { go doPutForWatch(context.TODO(), clients[i%len(clients)].KV, putreqc) @@ -149,10 +141,7 @@ func watchFunc(cmd *cobra.Command, args []string) { go func() { for i := 0; i < eventsTotal; i++ { - putreqc <- etcdserverpb.PutRequest{ - Key: watched[i%(len(watched))], - Value: []byte("data"), - } + putreqc <- v3.OpPut(watched[i%(len(watched))], "data") // TODO: use a real rate-limiter instead of sleep. time.Sleep(time.Second / time.Duration(watchPutRate)) } @@ -166,16 +155,17 @@ func watchFunc(cmd *cobra.Command, args []string) { <-pdoneC } -func doWatch(stream etcdserverpb.Watch_WatchClient, requests <-chan etcdserverpb.WatchRequest) { +func doWatch(stream v3.Watcher, requests <-chan string) { for r := range requests { st := time.Now() - err := stream.Send(&r) + wch := stream.Watch(context.TODO(), r) var errStr string - if err != nil { - errStr = err.Error() + if wch == nil { + errStr = "could not open watch channel" } results <- result{errStr: errStr, duration: time.Since(st)} bar.Increment() + go recvWatchChan(wch) } atomic.AddInt32(&nrWatchCompleted, 1) if atomic.LoadInt32(&nrWatchCompleted) == int32(watchTotalStreams) { @@ -183,15 +173,12 @@ func doWatch(stream etcdserverpb.Watch_WatchClient, requests <-chan etcdserverpb } <-putStartNotifier +} - for { +func recvWatchChan(wch v3.WatchChan) { + for range wch { st := time.Now() - _, err := stream.Recv() - var errStr string - if err != nil { - errStr = err.Error() - } - results <- result{errStr: errStr, duration: time.Since(st)} + results <- result{duration: time.Since(st)} bar.Increment() atomic.AddInt32(&nrRecvCompleted, 1) @@ -201,11 +188,11 @@ func doWatch(stream etcdserverpb.Watch_WatchClient, requests <-chan etcdserverpb } } -func doPutForWatch(ctx context.Context, client etcdserverpb.KVClient, requests <-chan etcdserverpb.PutRequest) { - for r := range requests { - _, err := client.Put(ctx, &r) +func doPutForWatch(ctx context.Context, client v3.KV, requests <-chan v3.Op) { + for op := range requests { + _, err := client.Do(ctx, op) if err != nil { - fmt.Fprintln(os.Stderr, "failed to Put for watch benchmark: %s", err) + fmt.Fprintf(os.Stderr, "failed to Put for watch benchmark: %v\n", err) os.Exit(1) } }