tools/benchmark: port to clientv3 API

This commit is contained in:
Anthony Romano 2016-02-25 13:09:34 -08:00
parent 5f62c05a6d
commit 298c1e2487
3 changed files with 42 additions and 54 deletions

View File

@ -24,7 +24,7 @@ import (
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/cheggaaa/pb"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/etcdserver/etcdserverpb"
v3 "github.com/coreos/etcd/clientv3"
)
// putCmd represents the put command
@ -61,10 +61,10 @@ func putFunc(cmd *cobra.Command, args []string) {
}
results = make(chan result)
requests := make(chan etcdserverpb.PutRequest, totalClients)
requests := make(chan v3.Op, totalClients)
bar = pb.New(putTotal)
k, v := make([]byte, keySize), mustRandBytes(valSize)
k, v := make([]byte, keySize), string(mustRandBytes(valSize))
clients := mustCreateClients(totalClients, totalConns)
@ -73,7 +73,7 @@ func putFunc(cmd *cobra.Command, args []string) {
for i := range clients {
wg.Add(1)
go doPut(context.Background(), clients[i].KV, requests)
go doPut(context.Background(), clients[i], requests)
}
pdoneC := printReport(results)
@ -85,7 +85,7 @@ func putFunc(cmd *cobra.Command, args []string) {
} else {
binary.PutVarint(k, int64(rand.Intn(keySpaceSize)))
}
requests <- etcdserverpb.PutRequest{Key: k, Value: v}
requests <- v3.OpPut(string(k), v)
}
close(requests)
}()
@ -98,12 +98,12 @@ func putFunc(cmd *cobra.Command, args []string) {
<-pdoneC
}
func doPut(ctx context.Context, client etcdserverpb.KVClient, requests <-chan etcdserverpb.PutRequest) {
func doPut(ctx context.Context, client v3.KV, requests <-chan v3.Op) {
defer wg.Done()
for r := range requests {
for op := range requests {
st := time.Now()
_, err := client.Put(ctx, &r)
_, err := client.Do(ctx, op)
var errStr string
if err != nil {

View File

@ -22,7 +22,7 @@ import (
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/cheggaaa/pb"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/etcdserver/etcdserverpb"
v3 "github.com/coreos/etcd/clientv3"
)
// rangeCmd represents the range command
@ -50,10 +50,10 @@ func rangeFunc(cmd *cobra.Command, args []string) {
os.Exit(1)
}
k := []byte(args[0])
var end []byte
k := args[0]
end := ""
if len(args) == 2 {
end = []byte(args[1])
end = args[1]
}
if rangeConsistency == "l" {
@ -66,7 +66,7 @@ func rangeFunc(cmd *cobra.Command, args []string) {
}
results = make(chan result)
requests := make(chan etcdserverpb.RangeRequest, totalClients)
requests := make(chan v3.Op, totalClients)
bar = pb.New(rangeTotal)
clients := mustCreateClients(totalClients, totalConns)
@ -83,11 +83,12 @@ func rangeFunc(cmd *cobra.Command, args []string) {
go func() {
for i := 0; i < rangeTotal; i++ {
r := etcdserverpb.RangeRequest{Key: k, RangeEnd: end}
opts := []v3.OpOption{v3.WithRange(end)}
if rangeConsistency == "s" {
r.Serializable = true
opts = append(opts, v3.WithSerializable())
}
requests <- r
op := v3.OpGet(k, opts...)
requests <- op
}
close(requests)
}()
@ -100,12 +101,12 @@ func rangeFunc(cmd *cobra.Command, args []string) {
<-pdoneC
}
func doRange(client etcdserverpb.KVClient, requests <-chan etcdserverpb.RangeRequest) {
func doRange(client v3.KV, requests <-chan v3.Op) {
defer wg.Done()
for req := range requests {
for op := range requests {
st := time.Now()
_, err := client.Range(context.Background(), &req)
_, err := client.Do(context.Background(), op)
var errStr string
if err != nil {

View File

@ -20,7 +20,7 @@ import (
"sync/atomic"
"time"
"github.com/coreos/etcd/etcdserver/etcdserverpb"
v3 "github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/cheggaaa/pb"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra"
@ -73,23 +73,18 @@ func init() {
}
func watchFunc(cmd *cobra.Command, args []string) {
watched := make([][]byte, watchedKeyTotal)
watched := make([]string, watchedKeyTotal)
for i := range watched {
watched[i] = mustRandBytes(32)
watched[i] = string(mustRandBytes(32))
}
requests := make(chan etcdserverpb.WatchRequest, totalClients)
requests := make(chan string, totalClients)
clients := mustCreateClients(totalClients, totalConns)
streams := make([]etcdserverpb.Watch_WatchClient, watchTotalStreams)
var err error
streams := make([]v3.Watcher, watchTotalStreams)
for i := range streams {
streams[i], err = clients[i%len(clients)].Watch.Watch(context.TODO())
if err != nil {
fmt.Fprintln(os.Stderr, "Failed to create watch stream:", err)
os.Exit(1)
}
streams[i] = v3.NewWatcher(clients[i%len(clients)])
}
putStartNotifier = make(chan struct{})
@ -111,10 +106,7 @@ func watchFunc(cmd *cobra.Command, args []string) {
go func() {
for i := 0; i < watchTotal; i++ {
requests <- etcdserverpb.WatchRequest{
RequestUnion: &etcdserverpb.WatchRequest_CreateRequest{
CreateRequest: &etcdserverpb.WatchCreateRequest{
Key: watched[i%(len(watched))]}}}
requests <- watched[i%len(watched)]
}
close(requests)
}()
@ -139,7 +131,7 @@ func watchFunc(cmd *cobra.Command, args []string) {
recvCompletedNotifier = make(chan struct{})
close(putStartNotifier)
putreqc := make(chan etcdserverpb.PutRequest)
putreqc := make(chan v3.Op)
for i := 0; i < watchPutTotal; i++ {
go doPutForWatch(context.TODO(), clients[i%len(clients)].KV, putreqc)
@ -149,10 +141,7 @@ func watchFunc(cmd *cobra.Command, args []string) {
go func() {
for i := 0; i < eventsTotal; i++ {
putreqc <- etcdserverpb.PutRequest{
Key: watched[i%(len(watched))],
Value: []byte("data"),
}
putreqc <- v3.OpPut(watched[i%(len(watched))], "data")
// TODO: use a real rate-limiter instead of sleep.
time.Sleep(time.Second / time.Duration(watchPutRate))
}
@ -166,16 +155,17 @@ func watchFunc(cmd *cobra.Command, args []string) {
<-pdoneC
}
func doWatch(stream etcdserverpb.Watch_WatchClient, requests <-chan etcdserverpb.WatchRequest) {
func doWatch(stream v3.Watcher, requests <-chan string) {
for r := range requests {
st := time.Now()
err := stream.Send(&r)
wch := stream.Watch(context.TODO(), r)
var errStr string
if err != nil {
errStr = err.Error()
if wch == nil {
errStr = "could not open watch channel"
}
results <- result{errStr: errStr, duration: time.Since(st)}
bar.Increment()
go recvWatchChan(wch)
}
atomic.AddInt32(&nrWatchCompleted, 1)
if atomic.LoadInt32(&nrWatchCompleted) == int32(watchTotalStreams) {
@ -183,15 +173,12 @@ func doWatch(stream etcdserverpb.Watch_WatchClient, requests <-chan etcdserverpb
}
<-putStartNotifier
}
for {
func recvWatchChan(wch v3.WatchChan) {
for range wch {
st := time.Now()
_, err := stream.Recv()
var errStr string
if err != nil {
errStr = err.Error()
}
results <- result{errStr: errStr, duration: time.Since(st)}
results <- result{duration: time.Since(st)}
bar.Increment()
atomic.AddInt32(&nrRecvCompleted, 1)
@ -201,11 +188,11 @@ func doWatch(stream etcdserverpb.Watch_WatchClient, requests <-chan etcdserverpb
}
}
func doPutForWatch(ctx context.Context, client etcdserverpb.KVClient, requests <-chan etcdserverpb.PutRequest) {
for r := range requests {
_, err := client.Put(ctx, &r)
func doPutForWatch(ctx context.Context, client v3.KV, requests <-chan v3.Op) {
for op := range requests {
_, err := client.Do(ctx, op)
if err != nil {
fmt.Fprintln(os.Stderr, "failed to Put for watch benchmark: %s", err)
fmt.Fprintf(os.Stderr, "failed to Put for watch benchmark: %v\n", err)
os.Exit(1)
}
}