From b94b8b5707065b91d702bb6ccaaad7f8af2a65b8 Mon Sep 17 00:00:00 2001 From: fanmin shi Date: Tue, 25 Apr 2017 10:19:20 -0700 Subject: [PATCH 1/6] etcd-runner: move root cmd into command package this allows easier sharing of global variable for sub commands. --- .../etcd-runner/{ => command}/help.go | 2 +- .../etcd-runner/command/root.go | 69 +++++++++++++++++++ tools/functional-tester/etcd-runner/main.go | 57 +-------------- 3 files changed, 72 insertions(+), 56 deletions(-) rename tools/functional-tester/etcd-runner/{ => command}/help.go (99%) create mode 100644 tools/functional-tester/etcd-runner/command/root.go diff --git a/tools/functional-tester/etcd-runner/help.go b/tools/functional-tester/etcd-runner/command/help.go similarity index 99% rename from tools/functional-tester/etcd-runner/help.go rename to tools/functional-tester/etcd-runner/command/help.go index c68f8de38..e7d7a4e89 100644 --- a/tools/functional-tester/etcd-runner/help.go +++ b/tools/functional-tester/etcd-runner/command/help.go @@ -14,7 +14,7 @@ // copied from https://github.com/rkt/rkt/blob/master/rkt/help.go -package main +package command import ( "bytes" diff --git a/tools/functional-tester/etcd-runner/command/root.go b/tools/functional-tester/etcd-runner/command/root.go new file mode 100644 index 000000000..4ecb6fcf8 --- /dev/null +++ b/tools/functional-tester/etcd-runner/command/root.go @@ -0,0 +1,69 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package command + +import ( + "log" + "math/rand" + "time" + + "github.com/spf13/cobra" +) + +const ( + cliName = "etcd-runner" + cliDescription = "Stress tests using clientv3 functionality.." + + defaultDialTimeout = 2 * time.Second +) + +var ( + rootCmd = &cobra.Command{ + Use: cliName, + Short: cliDescription, + SuggestFor: []string{"etcd-runner"}, + } +) + +func init() { + cobra.EnablePrefixMatching = true + + rand.Seed(time.Now().UnixNano()) + + log.SetFlags(log.Lmicroseconds) + + rootCmd.PersistentFlags().StringSliceVar(&endpoints, "endpoints", []string{"127.0.0.1:2379"}, "gRPC endpoints") + rootCmd.PersistentFlags().DurationVar(&dialTimeout, "dial-timeout", defaultDialTimeout, "dial timeout for client connections") + rootCmd.PersistentFlags().IntVar(&reqRate, "req-rate", 30, "maximum number of requests per second") + rootCmd.PersistentFlags().IntVar(&rounds, "rounds", 100, "number of rounds to run; 0 to run forever") + + rootCmd.AddCommand( + NewElectionCommand(), + NewLeaseRenewerCommand(), + NewLockRacerCommand(), + NewWatchCommand(), + ) +} + +func Start() { + rootCmd.SetUsageFunc(usageFunc) + + // Make help just show the usage + rootCmd.SetHelpTemplate(`{{.UsageString}}`) + + if err := rootCmd.Execute(); err != nil { + ExitWithError(ExitError, err) + } +} diff --git a/tools/functional-tester/etcd-runner/main.go b/tools/functional-tester/etcd-runner/main.go index 82dbc309f..04fede098 100644 --- a/tools/functional-tester/etcd-runner/main.go +++ b/tools/functional-tester/etcd-runner/main.go @@ -15,61 +15,8 @@ // etcd-runner is a command line application that performs tests on etcd. package main -import ( - "log" - "time" - - "github.com/coreos/etcd/tools/functional-tester/etcd-runner/command" - "github.com/spf13/cobra" -) - -const ( - cliName = "etcd-runner" - cliDescription = "Stress tests using clientv3 functionality.." - - defaultDialTimeout = 2 * time.Second -) - -var ( - globalFlags = command.GlobalFlags{} -) - -var ( - rootCmd = &cobra.Command{ - Use: cliName, - Short: cliDescription, - SuggestFor: []string{"etcd-runner"}, - } -) - -func init() { - log.SetFlags(log.Lmicroseconds) - rootCmd.PersistentFlags().StringSliceVar(&globalFlags.Endpoints, "endpoints", []string{"127.0.0.1:2379"}, "gRPC endpoints") - rootCmd.PersistentFlags().DurationVar(&globalFlags.DialTimeout, "dial-timeout", defaultDialTimeout, "dial timeout for client connections") - - rootCmd.AddCommand( - command.NewElectionCommand(), - command.NewLeaseRenewerCommand(), - command.NewLockRacerCommand(), - command.NewWatchCommand(), - ) -} - -func init() { - cobra.EnablePrefixMatching = true -} - -func Start() { - rootCmd.SetUsageFunc(usageFunc) - - // Make help just show the usage - rootCmd.SetHelpTemplate(`{{.UsageString}}`) - - if err := rootCmd.Execute(); err != nil { - command.ExitWithError(command.ExitError, err) - } -} +import "github.com/coreos/etcd/tools/functional-tester/etcd-runner/command" func main() { - Start() + command.Start() } From fa85445ef8fb71cf2bffac898e42c172f3fcbe3e Mon Sep 17 00:00:00 2001 From: fanmin shi Date: Tue, 25 Apr 2017 10:22:11 -0700 Subject: [PATCH 2/6] etcd-runner: add rate limiting in doRounds() --- .../etcd-runner/command/global.go | 35 +++++++++---------- 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/tools/functional-tester/etcd-runner/command/global.go b/tools/functional-tester/etcd-runner/command/global.go index 38d6de38c..1d26f2047 100644 --- a/tools/functional-tester/etcd-runner/command/global.go +++ b/tools/functional-tester/etcd-runner/command/global.go @@ -15,6 +15,7 @@ package command import ( + "context" "fmt" "log" "sync" @@ -23,25 +24,18 @@ import ( "github.com/coreos/etcd/clientv3" "github.com/spf13/cobra" + "golang.org/x/time/rate" ) +// shared flags var ( - rounds int // total number of rounds the operation needs to be performed - totalClientConnections int // total number of client connections to be made with server - noOfPrefixes int // total number of prefixes which will be watched upon - watchPerPrefix int // number of watchers per prefix - reqRate int // put request per second - totalKeys int // total number of keys for operation - runningTime time.Duration // time for which operation should be performed + totalClientConnections int // total number of client connections to be made with server + endpoints []string + dialTimeout time.Duration + rounds int // total number of rounds to run; set to <= 0 to run forever. + reqRate int // maximum number of requests per second. ) -// GlobalFlags are flags that defined globally -// and are inherited to all sub-commands. -type GlobalFlags struct { - Endpoints []string - DialTimeout time.Duration -} - type roundClient struct { c *clientv3.Client progress int @@ -61,16 +55,21 @@ func newClient(eps []string, timeout time.Duration) *clientv3.Client { return c } -func doRounds(rcs []roundClient, rounds int) { +func doRounds(rcs []roundClient, rounds int, requests int) { var mu sync.Mutex var wg sync.WaitGroup wg.Add(len(rcs)) finished := make(chan struct{}) + limiter := rate.NewLimiter(rate.Limit(reqRate), reqRate) for i := range rcs { go func(rc *roundClient) { defer wg.Done() - for rc.progress < rounds { + for rc.progress < rounds || rounds <= 0 { + if err := limiter.WaitN(context.Background(), requests/len(rcs)); err != nil { + log.Panicf("rate limiter error %v", err) + } + for rc.acquire() != nil { /* spin */ } @@ -85,7 +84,7 @@ func doRounds(rcs []roundClient, rounds int) { finished <- struct{}{} mu.Lock() - for rc.release() != nil { + for rc.release() != nil { /* spin */ mu.Unlock() mu.Lock() } @@ -95,7 +94,7 @@ func doRounds(rcs []roundClient, rounds int) { } start := time.Now() - for i := 1; i < len(rcs)*rounds+1; i++ { + for i := 1; i < len(rcs)*rounds+1 || rounds <= 0; i++ { select { case <-finished: if i%100 == 0 { From d57ad8ec8db0445b6809c10bd05b55b6b114eb49 Mon Sep 17 00:00:00 2001 From: fanmin shi Date: Tue, 25 Apr 2017 10:25:05 -0700 Subject: [PATCH 3/6] etcd-runner: add barrier, observe !ok handling, and election name arg to election-runner. --- .../etcd-runner/command/election_command.go | 65 ++++++++++++------- 1 file changed, 41 insertions(+), 24 deletions(-) diff --git a/tools/functional-tester/etcd-runner/command/election_command.go b/tools/functional-tester/etcd-runner/command/election_command.go index 2dd60de5e..e45c9ec11 100644 --- a/tools/functional-tester/etcd-runner/command/election_command.go +++ b/tools/functional-tester/etcd-runner/command/election_command.go @@ -20,32 +20,34 @@ import ( "fmt" "github.com/coreos/etcd/clientv3/concurrency" + "github.com/spf13/cobra" ) // NewElectionCommand returns the cobra command for "election runner". func NewElectionCommand() *cobra.Command { cmd := &cobra.Command{ - Use: "election", + Use: "election [election name (defaults to 'elector')]", Short: "Performs election operation", Run: runElectionFunc, } - cmd.Flags().IntVar(&rounds, "rounds", 100, "number of rounds to run") cmd.Flags().IntVar(&totalClientConnections, "total-client-connections", 10, "total number of client connections") return cmd } func runElectionFunc(cmd *cobra.Command, args []string) { - if len(args) > 0 { - ExitWithError(ExitBadArgs, errors.New("election does not take any argument")) + election := "elector" + if len(args) == 1 { + election = args[0] + } + if len(args) > 1 { + ExitWithError(ExitBadArgs, errors.New("election takes at most one argument")) } rcs := make([]roundClient, totalClientConnections) - validatec, releasec := make(chan struct{}, len(rcs)), make(chan struct{}, len(rcs)) - for range rcs { - releasec <- struct{}{} - } - + validatec := make(chan struct{}, len(rcs)) + // nextc closes when election is ready for next round. + nextc := make(chan struct{}) eps := endpointsFromFlag(cmd) dialTimeout := dialTimeoutFromCmd(cmd) @@ -53,6 +55,10 @@ func runElectionFunc(cmd *cobra.Command, args []string) { v := fmt.Sprintf("%d", i) observedLeader := "" validateWaiters := 0 + var rcNextc chan struct{} + setRcNextc := func() { + rcNextc = nextc + } rcs[i].c = newClient(eps, dialTimeout) var ( @@ -65,18 +71,22 @@ func runElectionFunc(cmd *cobra.Command, args []string) { break } } - e := concurrency.NewElection(s, "electors") - rcs[i].acquire = func() error { - <-releasec + e := concurrency.NewElection(s, election) + rcs[i].acquire = func() (err error) { ctx, cancel := context.WithCancel(context.Background()) + donec := make(chan struct{}) go func() { - if ol, ok := <-e.Observe(ctx); ok { - observedLeader = string(ol.Kvs[0].Value) - if observedLeader != v { - cancel() + defer close(donec) + for ctx.Err() == nil { + if ol, ok := <-e.Observe(ctx); ok { + observedLeader = string(ol.Kvs[0].Value) + break } } + if observedLeader != v { + cancel() + } }() err = e.Campaign(ctx, v) if err == nil { @@ -85,18 +95,24 @@ func runElectionFunc(cmd *cobra.Command, args []string) { if observedLeader == v { validateWaiters = len(rcs) } + cancel() + <-donec select { case <-ctx.Done(): return nil default: - cancel() return err } } rcs[i].validate = func() error { - if l, err := e.Leader(context.TODO()); err == nil && string(l.Kvs[0].Value) != observedLeader { - return fmt.Errorf("expected leader %q, got %q", observedLeader, l) + l, err := e.Leader(context.TODO()) + if err == nil && string(l.Kvs[0].Value) != observedLeader { + return fmt.Errorf("expected leader %q, got %q", observedLeader, l.Kvs[0].Value) } + if err != nil { + return err + } + setRcNextc() validatec <- struct{}{} return nil } @@ -113,14 +129,15 @@ func runElectionFunc(cmd *cobra.Command, args []string) { return err } if observedLeader == v { - for range rcs { - releasec <- struct{}{} - } + close(nextc) + nextc = make(chan struct{}) } + <-rcNextc observedLeader = "" return nil } } - - doRounds(rcs, rounds) + // each client creates 1 key from Campaign() and delete it from Resign() + // a round involves in 2*len(rcs) requests. + doRounds(rcs, rounds, 2*len(rcs)) } From 72fb756af348374291c61d53351eec616faaf96e Mon Sep 17 00:00:00 2001 From: fanmin shi Date: Tue, 25 Apr 2017 10:27:52 -0700 Subject: [PATCH 4/6] etcd-runner: add lease ttl as a flag and fatal when err in lease-runner. --- .../etcd-runner/command/lease_renewer_command.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/tools/functional-tester/etcd-runner/command/lease_renewer_command.go b/tools/functional-tester/etcd-runner/command/lease_renewer_command.go index e5257d430..439e267d5 100644 --- a/tools/functional-tester/etcd-runner/command/lease_renewer_command.go +++ b/tools/functional-tester/etcd-runner/command/lease_renewer_command.go @@ -22,11 +22,16 @@ import ( "time" "github.com/coreos/etcd/clientv3" + "github.com/spf13/cobra" "google.golang.org/grpc" "google.golang.org/grpc/codes" ) +var ( + leaseTTL int64 +) + // NewLeaseRenewerCommand returns the cobra command for "lease-renewer runner". func NewLeaseRenewerCommand() *cobra.Command { cmd := &cobra.Command{ @@ -34,6 +39,7 @@ func NewLeaseRenewerCommand() *cobra.Command { Short: "Performs lease renew operation", Run: runLeaseRenewerFunc, } + cmd.Flags().Int64Var(&leaseTTL, "ttl", 5, "lease's ttl") return cmd } @@ -54,7 +60,7 @@ func runLeaseRenewerFunc(cmd *cobra.Command, args []string) { err error ) for { - l, err = c.Lease.Grant(ctx, 5) + l, err = c.Lease.Grant(ctx, leaseTTL) if err == nil { break } @@ -65,14 +71,14 @@ func runLeaseRenewerFunc(cmd *cobra.Command, args []string) { lk, err = c.Lease.KeepAliveOnce(ctx, l.ID) if grpc.Code(err) == codes.NotFound { if time.Since(expire) < 0 { - log.Printf("bad renew! exceeded: %v", time.Since(expire)) + log.Fatalf("bad renew! exceeded: %v", time.Since(expire)) for { lk, err = c.Lease.KeepAliveOnce(ctx, l.ID) fmt.Println(lk, err) time.Sleep(time.Second) } } - log.Printf("lost lease %d, expire: %v\n", l.ID, expire) + log.Fatalf("lost lease %d, expire: %v\n", l.ID, expire) break } if err != nil { From debc69e1f2b1cb9193a6d8452ac0ea25c86c5ca8 Mon Sep 17 00:00:00 2001 From: fanmin shi Date: Tue, 25 Apr 2017 10:31:15 -0700 Subject: [PATCH 5/6] etcd-runner: pass in lock name as a command arg for lock_racer. --- .../etcd-runner/command/lock_racer_command.go | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/tools/functional-tester/etcd-runner/command/lock_racer_command.go b/tools/functional-tester/etcd-runner/command/lock_racer_command.go index d9544263a..b0ec491f4 100644 --- a/tools/functional-tester/etcd-runner/command/lock_racer_command.go +++ b/tools/functional-tester/etcd-runner/command/lock_racer_command.go @@ -20,24 +20,29 @@ import ( "fmt" "github.com/coreos/etcd/clientv3/concurrency" + "github.com/spf13/cobra" ) // NewLockRacerCommand returns the cobra command for "lock-racer runner". func NewLockRacerCommand() *cobra.Command { cmd := &cobra.Command{ - Use: "lock-racer", + Use: "lock-racer [name of lock (defaults to 'racers')]", Short: "Performs lock race operation", Run: runRacerFunc, } - cmd.Flags().IntVar(&rounds, "rounds", 100, "number of rounds to run") cmd.Flags().IntVar(&totalClientConnections, "total-client-connections", 10, "total number of client connections") return cmd } func runRacerFunc(cmd *cobra.Command, args []string) { - if len(args) > 0 { - ExitWithError(ExitBadArgs, errors.New("lock-racer does not take any argument")) + racers := "racers" + if len(args) == 1 { + racers = args[0] + } + + if len(args) > 1 { + ExitWithError(ExitBadArgs, errors.New("lock-racer takes at most one argument")) } rcs := make([]roundClient, totalClientConnections) @@ -61,7 +66,7 @@ func runRacerFunc(cmd *cobra.Command, args []string) { break } } - m := concurrency.NewMutex(s, "racers") + m := concurrency.NewMutex(s, racers) rcs[i].acquire = func() error { return m.Lock(ctx) } rcs[i].validate = func() error { if cnt++; cnt != 1 { @@ -77,5 +82,7 @@ func runRacerFunc(cmd *cobra.Command, args []string) { return nil } } - doRounds(rcs, rounds) + // each client creates 1 key from NewMutex() and delete it from Unlock() + // a round involves in 2*len(rcs) requests. + doRounds(rcs, rounds, 2*len(rcs)) } From 77fbe10dfcdf2981f45688dbe9d49291dac3023d Mon Sep 17 00:00:00 2001 From: fanmin shi Date: Tue, 25 Apr 2017 10:34:03 -0700 Subject: [PATCH 6/6] etcd-runner: add --prefix flag, allows inf round, and minor vars refactoring in watch runner. --- .../etcd-runner/command/watch_command.go | 27 ++++++++++++------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/tools/functional-tester/etcd-runner/command/watch_command.go b/tools/functional-tester/etcd-runner/command/watch_command.go index fe9ab279b..55cc0e673 100644 --- a/tools/functional-tester/etcd-runner/command/watch_command.go +++ b/tools/functional-tester/etcd-runner/command/watch_command.go @@ -24,10 +24,19 @@ import ( "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/pkg/stringutil" + "github.com/spf13/cobra" "golang.org/x/time/rate" ) +var ( + runningTime time.Duration // time for which operation should be performed + noOfPrefixes int // total number of prefixes which will be watched upon + watchPerPrefix int // number of watchers per prefix + watchPrefix string // prefix append to keys in watcher + totalKeys int // total number of keys for operation +) + // NewWatchCommand returns the cobra command for "watcher runner". func NewWatchCommand() *cobra.Command { cmd := &cobra.Command{ @@ -35,12 +44,12 @@ func NewWatchCommand() *cobra.Command { Short: "Performs watch operation", Run: runWatcherFunc, } - cmd.Flags().IntVar(&rounds, "rounds", 100, "number of rounds to run") cmd.Flags().DurationVar(&runningTime, "running-time", 60, "number of seconds to run") + cmd.Flags().StringVar(&watchPrefix, "prefix", "", "the prefix to append on all keys") cmd.Flags().IntVar(&noOfPrefixes, "total-prefixes", 10, "total no of prefixes to use") cmd.Flags().IntVar(&watchPerPrefix, "watch-per-prefix", 10, "number of watchers per prefix") - cmd.Flags().IntVar(&reqRate, "req-rate", 30, "rate at which put request will be performed") cmd.Flags().IntVar(&totalKeys, "total-keys", 1000, "total number of keys to watch") + return cmd } @@ -50,7 +59,7 @@ func runWatcherFunc(cmd *cobra.Command, args []string) { } ctx := context.Background() - for round := 0; round < rounds; round++ { + for round := 0; round < rounds || rounds <= 0; round++ { fmt.Println("round", round) performWatchOnPrefixes(ctx, cmd, round) } @@ -94,7 +103,7 @@ func performWatchOnPrefixes(ctx context.Context, cmd *cobra.Command, round int) if err = limiter.Wait(ctxt); err != nil { return } - if err = putKeyAtMostOnce(ctxt, client, roundPrefix+"-"+prefix+"-"+key); err != nil { + if err = putKeyAtMostOnce(ctxt, client, watchPrefix+"-"+roundPrefix+"-"+prefix+"-"+key); err != nil { log.Fatalf("failed to put key: %v", err) return } @@ -112,15 +121,15 @@ func performWatchOnPrefixes(ctx context.Context, cmd *cobra.Command, round int) rc := newClient(eps, dialTimeout) rcs = append(rcs, rc) - watchPrefix := roundPrefix + "-" + prefix + wprefix := watchPrefix + "-" + roundPrefix + "-" + prefix - wc := rc.Watch(ctxc, watchPrefix, clientv3.WithPrefix(), clientv3.WithRev(revision)) + wc := rc.Watch(ctxc, wprefix, clientv3.WithPrefix(), clientv3.WithRev(revision)) wcs = append(wcs, wc) wg.Add(1) go func() { defer wg.Done() - checkWatchResponse(wc, watchPrefix, keys) + checkWatchResponse(wc, wprefix, keys) }() } } @@ -139,7 +148,7 @@ func performWatchOnPrefixes(ctx context.Context, cmd *cobra.Command, round int) rc.Close() } - if err = deletePrefix(ctx, client, roundPrefix); err != nil { + if err = deletePrefix(ctx, client, watchPrefix); err != nil { log.Fatalf("failed to clean up keys after test: %v", err) } } @@ -148,7 +157,7 @@ func checkWatchResponse(wc clientv3.WatchChan, prefix string, keys []string) { for n := 0; n < len(keys); { wr, more := <-wc if !more { - log.Fatalf("expect more keys (received %d/%d) for %s", len(keys), n, prefix) + log.Fatalf("expect more keys (received %d/%d) for %s", n, len(keys), prefix) } for _, event := range wr.Events { expectedKey := prefix + "-" + keys[n]