*: sync "functional-tester" from 3.2 branch

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
This commit is contained in:
Gyuho Lee 2018-03-08 00:57:30 -08:00
parent 0874fcbed4
commit 290fa0c1be
24 changed files with 350 additions and 1021 deletions

View File

@ -1,4 +1,4 @@
// Copyright 2016 The etcd Authors
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -12,5 +12,5 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// etcd-runner is a program for testing etcd clientv3 features against a fault injected cluster.
package main
// Package debugutil includes utility functions for debugging.
package debugutil

47
pkg/debugutil/pprof.go Normal file
View File

@ -0,0 +1,47 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package debugutil
import (
"net/http"
"net/http/pprof"
"runtime"
)
const HTTPPrefixPProf = "/debug/pprof"
// PProfHandlers returns a map of pprof handlers keyed by the HTTP path.
func PProfHandlers() map[string]http.Handler {
// set only when there's no existing setting
if runtime.SetMutexProfileFraction(-1) == 0 {
// 1 out of 5 mutex events are reported, on average
runtime.SetMutexProfileFraction(5)
}
m := make(map[string]http.Handler)
m[HTTPPrefixPProf+"/"] = http.HandlerFunc(pprof.Index)
m[HTTPPrefixPProf+"/profile"] = http.HandlerFunc(pprof.Profile)
m[HTTPPrefixPProf+"/symbol"] = http.HandlerFunc(pprof.Symbol)
m[HTTPPrefixPProf+"/cmdline"] = http.HandlerFunc(pprof.Cmdline)
m[HTTPPrefixPProf+"/trace "] = http.HandlerFunc(pprof.Trace)
m[HTTPPrefixPProf+"/heap"] = pprof.Handler("heap")
m[HTTPPrefixPProf+"/goroutine"] = pprof.Handler("goroutine")
m[HTTPPrefixPProf+"/threadcreate"] = pprof.Handler("threadcreate")
m[HTTPPrefixPProf+"/block"] = pprof.Handler("block")
m[HTTPPrefixPProf+"/mutex"] = pprof.Handler("mutex")
return m
}

View File

@ -34,7 +34,7 @@ Running the script requires:
Notes:
- Docker image is based on Alpine Linux OS running in privileged mode to allow iptables manipulation.
- To specify testing parameters (etcd-tester arguments) modify tools/functional-tester/docker/docker-compose.yml or start etcd-tester manually
- (OSX) make sure that etcd binary is built for linux/amd64 (eg. `rm bin/etcd;GOOS=linux GOARCH=amd64 ./tools/functional-tester/test`) otherwise you get `exec format error`
- (OSX) make sure that etcd binary is built for linux/amd64 (eg. `rm bin/etcd;GOOS=linux GOARCH=amd64 ./tools/functional-tester/test`) otherwise it will return `exec format error`
## with Goreman

View File

@ -15,7 +15,6 @@
package main
import (
"fmt"
"os"
"os/exec"
"path/filepath"
@ -75,6 +74,7 @@ func newAgent(cfg AgentConfig) (*Agent, error) {
// start starts a new etcd process with the given args.
func (a *Agent) start(args ...string) error {
args = append(args, "--data-dir", a.dataDir())
a.cmd = exec.Command(a.cmd.Path, args...)
a.cmd.Env = []string{"GOFAIL_HTTP=" + a.cfg.FailpointAddr}
a.cmd.Stdout = a.logfile
@ -206,16 +206,7 @@ func (a *Agent) status() client.Status {
}
func (a *Agent) dataDir() string {
datadir := filepath.Join(a.cmd.Path, "*.etcd")
args := a.cmd.Args
// only parse the simple case like "--data-dir /var/lib/etcd"
for i, arg := range args {
if arg == "--data-dir" {
datadir = args[i+1]
break
}
}
return datadir
return filepath.Join(a.cfg.LogDir, "etcd.data")
}
func existDir(fpath string) bool {
@ -231,14 +222,14 @@ func existDir(fpath string) bool {
}
func archiveLogAndDataDir(logDir string, datadir string) error {
dir := filepath.Join("failure_archive", fmt.Sprint(time.Now().Format(time.RFC3339)))
dir := filepath.Join(logDir, "failure_archive", time.Now().Format(time.RFC3339))
if existDir(dir) {
dir = filepath.Join("failure_archive", fmt.Sprint(time.Now().Add(time.Second).Format(time.RFC3339)))
dir = filepath.Join(logDir, "failure_archive", time.Now().Add(time.Second).Format(time.RFC3339))
}
if err := fileutil.TouchDirAll(dir); err != nil {
return err
}
if err := os.Rename(logDir, filepath.Join(dir, filepath.Base(logDir))); err != nil {
if err := os.Rename(filepath.Join(logDir, "etcd.log"), filepath.Join(dir, "etcd.log")); err != nil {
if !os.IsNotExist(err) {
return err
}

View File

@ -27,7 +27,7 @@ var plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "etcd-agent")
func main() {
etcdPath := flag.String("etcd-path", filepath.Join(os.Getenv("GOPATH"), "bin/etcd"), "the path to etcd binary")
etcdLogDir := flag.String("etcd-log-dir", "etcd-log", "directory to store etcd logs")
etcdLogDir := flag.String("etcd-log-dir", "etcd-log", "directory to store etcd logs, data directories, failure archive")
port := flag.String("port", ":9027", "port to serve agent server")
useRoot := flag.Bool("use-root", true, "use root permissions")
failpointAddr := flag.String("failpoint-addr", ":2381", "interface for gofail's HTTP server")

View File

@ -1,126 +0,0 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package command
import (
"context"
"errors"
"fmt"
"github.com/coreos/etcd/clientv3/concurrency"
"github.com/spf13/cobra"
)
// NewElectionCommand returns the cobra command for "election runner".
func NewElectionCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "election",
Short: "Performs election operation",
Run: runElectionFunc,
}
cmd.Flags().IntVar(&rounds, "rounds", 100, "number of rounds to run")
cmd.Flags().IntVar(&totalClientConnections, "total-client-connections", 10, "total number of client connections")
return cmd
}
func runElectionFunc(cmd *cobra.Command, args []string) {
if len(args) > 0 {
ExitWithError(ExitBadArgs, errors.New("election does not take any argument"))
}
rcs := make([]roundClient, totalClientConnections)
validatec, releasec := make(chan struct{}, len(rcs)), make(chan struct{}, len(rcs))
for range rcs {
releasec <- struct{}{}
}
eps := endpointsFromFlag(cmd)
dialTimeout := dialTimeoutFromCmd(cmd)
for i := range rcs {
v := fmt.Sprintf("%d", i)
observedLeader := ""
validateWaiters := 0
rcs[i].c = newClient(eps, dialTimeout)
var (
s *concurrency.Session
err error
)
for {
s, err = concurrency.NewSession(rcs[i].c)
if err == nil {
break
}
}
e := concurrency.NewElection(s, "electors")
rcs[i].acquire = func() error {
<-releasec
ctx, cancel := context.WithCancel(context.Background())
go func() {
if ol, ok := <-e.Observe(ctx); ok {
observedLeader = string(ol.Kvs[0].Value)
if observedLeader != v {
cancel()
}
}
}()
err = e.Campaign(ctx, v)
if err == nil {
observedLeader = v
}
if observedLeader == v {
validateWaiters = len(rcs)
}
select {
case <-ctx.Done():
return nil
default:
cancel()
return err
}
}
rcs[i].validate = func() error {
if l, err := e.Leader(context.TODO()); err == nil && l != observedLeader {
return fmt.Errorf("expected leader %q, got %q", observedLeader, l)
}
validatec <- struct{}{}
return nil
}
rcs[i].release = func() error {
for validateWaiters > 0 {
select {
case <-validatec:
validateWaiters--
default:
return fmt.Errorf("waiting on followers")
}
}
if err := e.Resign(context.TODO()); err != nil {
return err
}
if observedLeader == v {
for range rcs {
releasec <- struct{}{}
}
}
observedLeader = ""
return nil
}
}
doRounds(rcs, rounds)
}

View File

@ -1,42 +0,0 @@
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package command
import (
"fmt"
"os"
"github.com/coreos/etcd/client"
)
const (
// http://tldp.org/LDP/abs/html/exitcodes.html
ExitSuccess = iota
ExitError
ExitBadConnection
ExitInvalidInput // for txn, watch command
ExitBadFeature // provided a valid flag with an unsupported value
ExitInterrupted
ExitIO
ExitBadArgs = 128
)
func ExitWithError(code int, err error) {
fmt.Fprintln(os.Stderr, "Error: ", err)
if cerr, ok := err.(*client.ClusterError); ok {
fmt.Fprintln(os.Stderr, cerr.Detail())
}
os.Exit(code)
}

View File

@ -1,130 +0,0 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package command
import (
"fmt"
"log"
"sync"
"time"
"github.com/coreos/etcd/clientv3"
"github.com/spf13/cobra"
)
var (
rounds int // total number of rounds the operation needs to be performed
totalClientConnections int // total number of client connections to be made with server
noOfPrefixes int // total number of prefixes which will be watched upon
watchPerPrefix int // number of watchers per prefix
reqRate int // put request per second
totalKeys int // total number of keys for operation
runningTime time.Duration // time for which operation should be performed
)
// GlobalFlags are flags that defined globally
// and are inherited to all sub-commands.
type GlobalFlags struct {
Endpoints []string
DialTimeout time.Duration
}
type roundClient struct {
c *clientv3.Client
progress int
acquire func() error
validate func() error
release func() error
}
func newClient(eps []string, timeout time.Duration) *clientv3.Client {
c, err := clientv3.New(clientv3.Config{
Endpoints: eps,
DialTimeout: time.Duration(timeout) * time.Second,
})
if err != nil {
log.Fatal(err)
}
return c
}
func doRounds(rcs []roundClient, rounds int) {
var mu sync.Mutex
var wg sync.WaitGroup
wg.Add(len(rcs))
finished := make(chan struct{}, 0)
for i := range rcs {
go func(rc *roundClient) {
defer wg.Done()
for rc.progress < rounds {
for rc.acquire() != nil { /* spin */
}
mu.Lock()
if err := rc.validate(); err != nil {
log.Fatal(err)
}
mu.Unlock()
time.Sleep(10 * time.Millisecond)
rc.progress++
finished <- struct{}{}
mu.Lock()
for rc.release() != nil {
mu.Unlock()
mu.Lock()
}
mu.Unlock()
}
}(&rcs[i])
}
start := time.Now()
for i := 1; i < len(rcs)*rounds+1; i++ {
select {
case <-finished:
if i%100 == 0 {
fmt.Printf("finished %d, took %v\n", i, time.Since(start))
start = time.Now()
}
case <-time.After(time.Minute):
log.Panic("no progress after 1 minute!")
}
}
wg.Wait()
for _, rc := range rcs {
rc.c.Close()
}
}
func endpointsFromFlag(cmd *cobra.Command) []string {
endpoints, err := cmd.Flags().GetStringSlice("endpoints")
if err != nil {
ExitWithError(ExitError, err)
}
return endpoints
}
func dialTimeoutFromCmd(cmd *cobra.Command) time.Duration {
dialTimeout, err := cmd.Flags().GetDuration("dial-timeout")
if err != nil {
ExitWithError(ExitError, err)
}
return dialTimeout
}

View File

@ -1,86 +0,0 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package command
import (
"context"
"errors"
"fmt"
"log"
"time"
"github.com/coreos/etcd/clientv3"
"github.com/spf13/cobra"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)
// NewLeaseRenewerCommand returns the cobra command for "lease-renewer runner".
func NewLeaseRenewerCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "lease-renewer",
Short: "Performs lease renew operation",
Run: runLeaseRenewerFunc,
}
return cmd
}
func runLeaseRenewerFunc(cmd *cobra.Command, args []string) {
if len(args) > 0 {
ExitWithError(ExitBadArgs, errors.New("lease-renewer does not take any argument"))
}
eps := endpointsFromFlag(cmd)
dialTimeout := dialTimeoutFromCmd(cmd)
c := newClient(eps, dialTimeout)
ctx := context.Background()
for {
var (
l *clientv3.LeaseGrantResponse
lk *clientv3.LeaseKeepAliveResponse
err error
)
for {
l, err = c.Lease.Grant(ctx, 5)
if err == nil {
break
}
}
expire := time.Now().Add(time.Duration(l.TTL-1) * time.Second)
for {
lk, err = c.Lease.KeepAliveOnce(ctx, l.ID)
if grpc.Code(err) == codes.NotFound {
if time.Since(expire) < 0 {
log.Printf("bad renew! exceeded: %v", time.Since(expire))
for {
lk, err = c.Lease.KeepAliveOnce(ctx, l.ID)
fmt.Println(lk, err)
time.Sleep(time.Second)
}
}
log.Printf("lost lease %d, expire: %v\n", l.ID, expire)
break
}
if err != nil {
continue
}
expire = time.Now().Add(time.Duration(lk.TTL-1) * time.Second)
log.Printf("renewed lease %d, expire: %v\n", lk.ID, expire)
time.Sleep(time.Duration(lk.TTL-2) * time.Second)
}
}
}

View File

@ -1,81 +0,0 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package command
import (
"context"
"errors"
"fmt"
"github.com/coreos/etcd/clientv3/concurrency"
"github.com/spf13/cobra"
)
// NewLockRacerCommand returns the cobra command for "lock-racer runner".
func NewLockRacerCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "lock-racer",
Short: "Performs lock race operation",
Run: runRacerFunc,
}
cmd.Flags().IntVar(&rounds, "rounds", 100, "number of rounds to run")
cmd.Flags().IntVar(&totalClientConnections, "total-client-connections", 10, "total number of client connections")
return cmd
}
func runRacerFunc(cmd *cobra.Command, args []string) {
if len(args) > 0 {
ExitWithError(ExitBadArgs, errors.New("lock-racer does not take any argument"))
}
rcs := make([]roundClient, totalClientConnections)
ctx := context.Background()
cnt := 0
eps := endpointsFromFlag(cmd)
dialTimeout := dialTimeoutFromCmd(cmd)
for i := range rcs {
var (
s *concurrency.Session
err error
)
rcs[i].c = newClient(eps, dialTimeout)
for {
s, err = concurrency.NewSession(rcs[i].c)
if err == nil {
break
}
}
m := concurrency.NewMutex(s, "racers")
rcs[i].acquire = func() error { return m.Lock(ctx) }
rcs[i].validate = func() error {
if cnt++; cnt != 1 {
return fmt.Errorf("bad lock; count: %d", cnt)
}
return nil
}
rcs[i].release = func() error {
if err := m.Unlock(ctx); err != nil {
return err
}
cnt = 0
return nil
}
}
doRounds(rcs, rounds)
}

View File

@ -1,202 +0,0 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package command
import (
"context"
"errors"
"fmt"
"log"
"sync"
"time"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/pkg/stringutil"
"github.com/spf13/cobra"
"golang.org/x/time/rate"
)
// NewWatchCommand returns the cobra command for "watcher runner".
func NewWatchCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "watcher",
Short: "Performs watch operation",
Run: runWatcherFunc,
}
cmd.Flags().IntVar(&rounds, "rounds", 100, "number of rounds to run")
cmd.Flags().DurationVar(&runningTime, "running-time", 60, "number of seconds to run")
cmd.Flags().IntVar(&noOfPrefixes, "total-prefixes", 10, "total no of prefixes to use")
cmd.Flags().IntVar(&watchPerPrefix, "watch-per-prefix", 10, "number of watchers per prefix")
cmd.Flags().IntVar(&reqRate, "req-rate", 30, "rate at which put request will be performed")
cmd.Flags().IntVar(&totalKeys, "total-keys", 1000, "total number of keys to watch")
return cmd
}
func runWatcherFunc(cmd *cobra.Command, args []string) {
if len(args) > 0 {
ExitWithError(ExitBadArgs, errors.New("watcher does not take any argument"))
}
ctx := context.Background()
for round := 0; round < rounds; round++ {
fmt.Println("round", round)
performWatchOnPrefixes(ctx, cmd, round)
}
}
func performWatchOnPrefixes(ctx context.Context, cmd *cobra.Command, round int) {
keyPerPrefix := totalKeys / noOfPrefixes
prefixes := stringutil.UniqueStrings(5, noOfPrefixes)
keys := stringutil.RandomStrings(10, keyPerPrefix)
roundPrefix := fmt.Sprintf("%16x", round)
eps := endpointsFromFlag(cmd)
dialTimeout := dialTimeoutFromCmd(cmd)
var (
revision int64
wg sync.WaitGroup
gr *clientv3.GetResponse
err error
)
client := newClient(eps, dialTimeout)
defer client.Close()
gr, err = getKey(ctx, client, "non-existent")
if err != nil {
log.Fatalf("failed to get the initial revision: %v", err)
}
revision = gr.Header.Revision
ctxt, cancel := context.WithDeadline(ctx, time.Now().Add(runningTime*time.Second))
defer cancel()
// generate and put keys in cluster
limiter := rate.NewLimiter(rate.Limit(reqRate), reqRate)
go func() {
for _, key := range keys {
for _, prefix := range prefixes {
if err = limiter.Wait(ctxt); err != nil {
return
}
if err = putKeyAtMostOnce(ctxt, client, roundPrefix+"-"+prefix+"-"+key); err != nil {
log.Fatalf("failed to put key: %v", err)
return
}
}
}
}()
ctxc, cancelc := context.WithCancel(ctx)
wcs := make([]clientv3.WatchChan, 0)
rcs := make([]*clientv3.Client, 0)
for _, prefix := range prefixes {
for j := 0; j < watchPerPrefix; j++ {
rc := newClient(eps, dialTimeout)
rcs = append(rcs, rc)
watchPrefix := roundPrefix + "-" + prefix
wc := rc.Watch(ctxc, watchPrefix, clientv3.WithPrefix(), clientv3.WithRev(revision))
wcs = append(wcs, wc)
wg.Add(1)
go func() {
defer wg.Done()
checkWatchResponse(wc, watchPrefix, keys)
}()
}
}
wg.Wait()
cancelc()
// verify all watch channels are closed
for e, wc := range wcs {
if _, ok := <-wc; ok {
log.Fatalf("expected wc to be closed, but received %v", e)
}
}
for _, rc := range rcs {
rc.Close()
}
if err = deletePrefix(ctx, client, roundPrefix); err != nil {
log.Fatalf("failed to clean up keys after test: %v", err)
}
}
func checkWatchResponse(wc clientv3.WatchChan, prefix string, keys []string) {
for n := 0; n < len(keys); {
wr, more := <-wc
if !more {
log.Fatalf("expect more keys (received %d/%d) for %s", len(keys), n, prefix)
}
for _, event := range wr.Events {
expectedKey := prefix + "-" + keys[n]
receivedKey := string(event.Kv.Key)
if expectedKey != receivedKey {
log.Fatalf("expected key %q, got %q for prefix : %q\n", expectedKey, receivedKey, prefix)
}
n++
}
}
}
func putKeyAtMostOnce(ctx context.Context, client *clientv3.Client, key string) error {
gr, err := getKey(ctx, client, key)
if err != nil {
return err
}
var modrev int64
if len(gr.Kvs) > 0 {
modrev = gr.Kvs[0].ModRevision
}
for ctx.Err() == nil {
_, err := client.Txn(ctx).If(clientv3.Compare(clientv3.ModRevision(key), "=", modrev)).Then(clientv3.OpPut(key, key)).Commit()
if err == nil {
return nil
}
}
return ctx.Err()
}
func deletePrefix(ctx context.Context, client *clientv3.Client, key string) error {
for ctx.Err() == nil {
if _, err := client.Delete(ctx, key, clientv3.WithPrefix()); err == nil {
return nil
}
}
return ctx.Err()
}
func getKey(ctx context.Context, client *clientv3.Client, key string) (*clientv3.GetResponse, error) {
for ctx.Err() == nil {
if gr, err := client.Get(ctx, key); err == nil {
return gr, nil
}
}
return nil, ctx.Err()
}

View File

@ -1,174 +0,0 @@
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// copied from https://github.com/coreos/rkt/blob/master/rkt/help.go
package main
import (
"bytes"
"fmt"
"io"
"os"
"strings"
"text/tabwriter"
"text/template"
"github.com/coreos/etcd/version"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
)
var (
commandUsageTemplate *template.Template
templFuncs = template.FuncMap{
"descToLines": func(s string) []string {
// trim leading/trailing whitespace and split into slice of lines
return strings.Split(strings.Trim(s, "\n\t "), "\n")
},
"cmdName": func(cmd *cobra.Command, startCmd *cobra.Command) string {
parts := []string{cmd.Name()}
for cmd.HasParent() && cmd.Parent().Name() != startCmd.Name() {
cmd = cmd.Parent()
parts = append([]string{cmd.Name()}, parts...)
}
return strings.Join(parts, " ")
},
}
)
func init() {
commandUsage := `
{{ $cmd := .Cmd }}\
{{ $cmdname := cmdName .Cmd .Cmd.Root }}\
NAME:
{{ if not .Cmd.HasParent }}\
{{printf "\t%s - %s" .Cmd.Name .Cmd.Short}}
{{else}}\
{{printf "\t%s - %s" $cmdname .Cmd.Short}}
{{end}}\
USAGE:
{{printf "\t%s" .Cmd.UseLine}}
{{ if not .Cmd.HasParent }}\
VERSION:
{{printf "\t%s" .Version}}
{{end}}\
{{if .Cmd.HasSubCommands}}\
API VERSION:
{{printf "\t%s" .APIVersion}}
{{end}}\
{{if .Cmd.HasSubCommands}}\
COMMANDS:
{{range .SubCommands}}\
{{ $cmdname := cmdName . $cmd }}\
{{ if .Runnable }}\
{{printf "\t%s\t%s" $cmdname .Short}}
{{end}}\
{{end}}\
{{end}}\
{{ if .Cmd.Long }}\
DESCRIPTION:
{{range $line := descToLines .Cmd.Long}}{{printf "\t%s" $line}}
{{end}}\
{{end}}\
{{if .Cmd.HasLocalFlags}}\
OPTIONS:
{{.LocalFlags}}\
{{end}}\
{{if .Cmd.HasInheritedFlags}}\
GLOBAL OPTIONS:
{{.GlobalFlags}}\
{{end}}
`[1:]
commandUsageTemplate = template.Must(template.New("command_usage").Funcs(templFuncs).Parse(strings.Replace(commandUsage, "\\\n", "", -1)))
}
func etcdFlagUsages(flagSet *pflag.FlagSet) string {
x := new(bytes.Buffer)
flagSet.VisitAll(func(flag *pflag.Flag) {
if len(flag.Deprecated) > 0 {
return
}
format := ""
if len(flag.Shorthand) > 0 {
format = " -%s, --%s"
} else {
format = " %s --%s"
}
if len(flag.NoOptDefVal) > 0 {
format = format + "["
}
if flag.Value.Type() == "string" {
// put quotes on the value
format = format + "=%q"
} else {
format = format + "=%s"
}
if len(flag.NoOptDefVal) > 0 {
format = format + "]"
}
format = format + "\t%s\n"
shorthand := flag.Shorthand
fmt.Fprintf(x, format, shorthand, flag.Name, flag.DefValue, flag.Usage)
})
return x.String()
}
func getSubCommands(cmd *cobra.Command) []*cobra.Command {
var subCommands []*cobra.Command
for _, subCmd := range cmd.Commands() {
subCommands = append(subCommands, subCmd)
subCommands = append(subCommands, getSubCommands(subCmd)...)
}
return subCommands
}
func usageFunc(cmd *cobra.Command) error {
subCommands := getSubCommands(cmd)
tabOut := getTabOutWithWriter(os.Stdout)
commandUsageTemplate.Execute(tabOut, struct {
Cmd *cobra.Command
LocalFlags string
GlobalFlags string
SubCommands []*cobra.Command
Version string
APIVersion string
}{
cmd,
etcdFlagUsages(cmd.LocalFlags()),
etcdFlagUsages(cmd.InheritedFlags()),
subCommands,
version.Version,
version.APIVersion,
})
tabOut.Flush()
return nil
}
func getTabOutWithWriter(writer io.Writer) *tabwriter.Writer {
aTabOut := new(tabwriter.Writer)
aTabOut.Init(writer, 0, 8, 1, '\t', 0)
return aTabOut
}

View File

@ -1,75 +0,0 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// etcd-runner is a command line application that performs tests on etcd.
package main
import (
"log"
"time"
"github.com/coreos/etcd/tools/functional-tester/etcd-runner/command"
"github.com/spf13/cobra"
)
const (
cliName = "etcd-runner"
cliDescription = "Stress tests using clientv3 functionality.."
defaultDialTimeout = 2 * time.Second
)
var (
globalFlags = command.GlobalFlags{}
)
var (
rootCmd = &cobra.Command{
Use: cliName,
Short: cliDescription,
SuggestFor: []string{"etcd-runner"},
}
)
func init() {
log.SetFlags(log.Lmicroseconds)
rootCmd.PersistentFlags().StringSliceVar(&globalFlags.Endpoints, "endpoints", []string{"127.0.0.1:2379"}, "gRPC endpoints")
rootCmd.PersistentFlags().DurationVar(&globalFlags.DialTimeout, "dial-timeout", defaultDialTimeout, "dial timeout for client connections")
rootCmd.AddCommand(
command.NewElectionCommand(),
command.NewLeaseRenewerCommand(),
command.NewLockRacerCommand(),
command.NewWatchCommand(),
)
}
func init() {
cobra.EnablePrefixMatching = true
}
func Start() {
rootCmd.SetUsageFunc(usageFunc)
// Make help just show the usage
rootCmd.SetHelpTemplate(`{{.UsageString}}`)
if err := rootCmd.Execute(); err != nil {
command.ExitWithError(command.ExitError, err)
}
}
func main() {
Start()
}

View File

@ -133,7 +133,8 @@ func (lc *leaseChecker) checkShortLivedLease(ctx context.Context, leaseID int64)
var resp *pb.LeaseTimeToLiveResponse
for i := 0; i < retries; i++ {
resp, err = lc.getLeaseByID(ctx, leaseID)
if rpctypes.Error(err) == rpctypes.ErrLeaseNotFound {
// lease not found, for ~v3.1 compatibilities, check ErrLeaseNotFound
if (err == nil && resp.TTL == -1) || (err != nil && rpctypes.Error(err) == rpctypes.ErrLeaseNotFound) {
return nil
}
if err != nil {
@ -195,12 +196,14 @@ func (lc *leaseChecker) hasLeaseExpired(ctx context.Context, leaseID int64) (boo
// keep retrying until lease's state is known or ctx is being canceled
for ctx.Err() == nil {
resp, err := lc.getLeaseByID(ctx, leaseID)
if err == nil {
return false, nil
}
if err != nil {
// for ~v3.1 compatibilities
if rpctypes.Error(err) == rpctypes.ErrLeaseNotFound {
return true, nil
}
} else {
return resp.TTL == -1, nil
}
plog.Warningf("hasLeaseExpired %v resp %v error %v (endpoint %q)", leaseID, resp, err, lc.endpoint)
}
return false, ctx.Err()
@ -242,6 +245,19 @@ func (cchecker *compositeChecker) Check() error {
return errsToError(errs)
}
type runnerChecker struct {
errc chan error
}
func (rc *runnerChecker) Check() error {
select {
case err := <-rc.errc:
return err
default:
return nil
}
}
type noChecker struct{}
func newNoChecker() Checker { return &noChecker{} }

View File

@ -34,8 +34,6 @@ type agentConfig struct {
clientPort int
peerPort int
failpointPort int
datadir string
}
type cluster struct {
@ -78,9 +76,9 @@ func (c *cluster) bootstrap() error {
for i, m := range members {
flags := append(
m.Flags(),
"--data-dir", c.agents[i].datadir,
"--initial-cluster-token", token,
"--initial-cluster", clusterStr)
"--initial-cluster", clusterStr,
"--snapshot-count", "10000")
if _, err := m.Agent.Start(flags...); err != nil {
// cleanup

View File

@ -0,0 +1,97 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"fmt"
"io/ioutil"
"os/exec"
"syscall"
"golang.org/x/time/rate"
)
type runnerStresser struct {
cmd *exec.Cmd
cmdStr string
args []string
rl *rate.Limiter
reqRate int
errc chan error
donec chan struct{}
}
func newRunnerStresser(cmdStr string, args []string, rl *rate.Limiter, reqRate int) *runnerStresser {
rl.SetLimit(rl.Limit() - rate.Limit(reqRate))
return &runnerStresser{
cmdStr: cmdStr,
args: args,
rl: rl,
reqRate: reqRate,
errc: make(chan error, 1),
donec: make(chan struct{}),
}
}
func (rs *runnerStresser) setupOnce() (err error) {
if rs.cmd != nil {
return nil
}
rs.cmd = exec.Command(rs.cmdStr, rs.args...)
stderr, err := rs.cmd.StderrPipe()
if err != nil {
return err
}
go func() {
defer close(rs.donec)
out, err := ioutil.ReadAll(stderr)
if err != nil {
rs.errc <- err
} else {
rs.errc <- fmt.Errorf("(%v %v) stderr %v", rs.cmdStr, rs.args, string(out))
}
}()
return rs.cmd.Start()
}
func (rs *runnerStresser) Stress() (err error) {
if err = rs.setupOnce(); err != nil {
return err
}
return syscall.Kill(rs.cmd.Process.Pid, syscall.SIGCONT)
}
func (rs *runnerStresser) Pause() {
syscall.Kill(rs.cmd.Process.Pid, syscall.SIGSTOP)
}
func (rs *runnerStresser) Close() {
syscall.Kill(rs.cmd.Process.Pid, syscall.SIGINT)
rs.cmd.Wait()
<-rs.donec
rs.rl.SetLimit(rs.rl.Limit() + rate.Limit(rs.reqRate))
}
func (rs *runnerStresser) ModifiedKeys() int64 {
return 1
}
func (rs *runnerStresser) Checker() Checker {
return &runnerChecker{rs.errc}
}

View File

@ -32,7 +32,7 @@ type failpointStats struct {
var fpStats failpointStats
func failpointFailures(c *cluster) (ret []failure, err error) {
func failpointFailures(c *cluster, failpoints []string) (ret []failure, err error) {
var fps []string
fps, err = failpointPaths(c.Members[0].FailpointURL)
if err != nil {
@ -43,7 +43,7 @@ func failpointFailures(c *cluster) (ret []failure, err error) {
if len(fp) == 0 {
continue
}
fpFails := failuresFromFailpoint(fp)
fpFails := failuresFromFailpoint(fp, failpoints)
// wrap in delays so failpoint has time to trigger
for i, fpf := range fpFails {
if strings.Contains(fp, "Snap") {
@ -77,34 +77,39 @@ func failpointPaths(endpoint string) ([]string, error) {
return fps, nil
}
func failuresFromFailpoint(fp string) []failure {
inject := makeInjectFailpoint(fp, `panic("etcd-tester")`)
// failpoints follows FreeBSD KFAIL_POINT syntax.
// e.g. panic("etcd-tester"),1*sleep(1000)->panic("etcd-tester")
func failuresFromFailpoint(fp string, failpoints []string) (fs []failure) {
recov := makeRecoverFailpoint(fp)
return []failure{
for _, failpoint := range failpoints {
inject := makeInjectFailpoint(fp, failpoint)
fs = append(fs, []failure{
&failureOne{
description: description("failpoint " + fp + " panic one"),
description: description(fmt.Sprintf("failpoint %s (one: %s)", fp, failpoint)),
injectMember: inject,
recoverMember: recov,
},
&failureAll{
description: description("failpoint " + fp + " panic all"),
description: description(fmt.Sprintf("failpoint %s (all: %s)", fp, failpoint)),
injectMember: inject,
recoverMember: recov,
},
&failureMajority{
description: description("failpoint " + fp + " panic majority"),
description: description(fmt.Sprintf("failpoint %s (majority: %s)", fp, failpoint)),
injectMember: inject,
recoverMember: recov,
},
&failureLeader{
failureByFunc{
description: description("failpoint " + fp + " panic leader"),
description: description(fmt.Sprintf("failpoint %s (leader: %s)", fp, failpoint)),
injectMember: inject,
recoverMember: recov,
},
0,
},
}...)
}
return fs
}
func makeInjectFailpoint(fp, val string) injectMemberFunc {

View File

@ -140,11 +140,16 @@ func (s *keyStresser) run(ctx context.Context) {
}
}
func (s *keyStresser) Cancel() {
func (s *keyStresser) Pause() {
s.Close()
}
func (s *keyStresser) Close() {
s.cancel()
s.conn.Close()
s.wg.Wait()
plog.Infof("keyStresser %q is canceled", s.Endpoint)
plog.Infof("keyStresser %q is closed", s.Endpoint)
}
func (s *keyStresser) ModifiedKeys() int64 {

View File

@ -361,13 +361,17 @@ func (ls *leaseStresser) randomlyDropLease(leaseID int64) (bool, error) {
return false, ls.ctx.Err()
}
func (ls *leaseStresser) Cancel() {
plog.Debugf("lease stresser %q is canceling...", ls.endpoint)
func (ls *leaseStresser) Pause() {
ls.Close()
}
func (ls *leaseStresser) Close() {
plog.Debugf("lease stresser %q is closing...", ls.endpoint)
ls.cancel()
ls.runWg.Wait()
ls.aliveWg.Wait()
ls.conn.Close()
plog.Infof("lease stresser %q is canceled", ls.endpoint)
plog.Infof("lease stresser %q is closed", ls.endpoint)
}
func (ls *leaseStresser) ModifiedKeys() int64 {

View File

@ -18,10 +18,11 @@ import (
"flag"
"fmt"
"net/http"
"net/http/pprof"
"os"
"strings"
"github.com/coreos/etcd/pkg/debugutil"
"github.com/coreos/pkg/capnslog"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/time/rate"
@ -35,24 +36,24 @@ const (
defaultFailpointPort = 2381
)
const pprofPrefix = "/debug/pprof-tester"
func main() {
endpointStr := flag.String("agent-endpoints", "localhost:9027", "HTTP RPC endpoints of agents. Do not specify the schema.")
clientPorts := flag.String("client-ports", "", "etcd client port for each agent endpoint")
peerPorts := flag.String("peer-ports", "", "etcd peer port for each agent endpoint")
failpointPorts := flag.String("failpoint-ports", "", "etcd failpoint port for each agent endpoint")
datadir := flag.String("data-dir", "agent.etcd", "etcd data directory location on agent machine.")
stressKeyLargeSize := flag.Uint("stress-key-large-size", 32*1024+1, "the size of each large key written into etcd.")
stressKeySize := flag.Uint("stress-key-size", 100, "the size of each small key written into etcd.")
stressKeySuffixRange := flag.Uint("stress-key-count", 250000, "the count of key range written into etcd.")
limit := flag.Int("limit", -1, "the limit of rounds to run failure set (-1 to run without limits).")
exitOnFailure := flag.Bool("exit-on-failure", false, "exit tester on first failure")
stressQPS := flag.Int("stress-qps", 10000, "maximum number of stresser requests per second.")
schedCases := flag.String("schedule-cases", "", "test case schedule")
consistencyCheck := flag.Bool("consistency-check", true, "true to check consistency (revision, hash)")
stresserType := flag.String("stresser", "keys,lease", "comma separated list of stressers (keys, lease, v2keys, nop).")
stresserType := flag.String("stresser", "keys,lease", "comma separated list of stressers (keys, lease, v2keys, nop, election-runner, watch-runner, lock-racer-runner, lease-runner).")
etcdRunnerPath := flag.String("etcd-runner", "", "specify a path of etcd runner binary")
failureTypes := flag.String("failures", "default,failpoints", "specify failures (concat of \"default\" and \"failpoints\").")
failpoints := flag.String("failpoints", `panic("etcd-tester")`, `comma separated list of failpoint terms to inject (e.g. 'panic("etcd-tester"),1*sleep(1000)')`)
externalFailures := flag.String("external-failures", "", "specify a path of script for enabling/disabling an external fault injector")
enablePprof := flag.Bool("enable-pprof", false, "true to enable pprof")
flag.Parse()
@ -68,7 +69,6 @@ func main() {
agents[i].clientPort = cports[i]
agents[i].peerPort = pports[i]
agents[i].failpointPort = fports[i]
agents[i].datadir = *datadir
}
c := &cluster{agents: agents}
@ -83,7 +83,8 @@ func main() {
var failures []failure
if failureTypes != nil && *failureTypes != "" {
failures = makeFailures(*failureTypes, c)
types, failpoints := strings.Split(*failureTypes, ","), strings.Split(*failpoints, ",")
failures = makeFailures(types, failpoints, c)
}
if externalFailures != nil && *externalFailures != "" {
@ -120,12 +121,15 @@ func main() {
keySuffixRange: int(*stressKeySuffixRange),
numLeases: 10,
keysPerLease: 10,
etcdRunnerPath: *etcdRunnerPath,
}
t := &tester{
failures: schedule,
cluster: c,
limit: *limit,
exitOnFailure: *exitOnFailure,
scfg: scfg,
stresserType: *stresserType,
@ -137,15 +141,9 @@ func main() {
http.Handle("/metrics", prometheus.Handler())
if *enablePprof {
http.Handle(pprofPrefix+"/", http.HandlerFunc(pprof.Index))
http.Handle(pprofPrefix+"/profile", http.HandlerFunc(pprof.Profile))
http.Handle(pprofPrefix+"/symbol", http.HandlerFunc(pprof.Symbol))
http.Handle(pprofPrefix+"/cmdline", http.HandlerFunc(pprof.Cmdline))
http.Handle(pprofPrefix+"/trace", http.HandlerFunc(pprof.Trace))
http.Handle(pprofPrefix+"/heap", pprof.Handler("heap"))
http.Handle(pprofPrefix+"/goroutine", pprof.Handler("goroutine"))
http.Handle(pprofPrefix+"/threadcreate", pprof.Handler("threadcreate"))
http.Handle(pprofPrefix+"/block", pprof.Handler("block"))
for p, h := range debugutil.PProfHandlers() {
http.Handle(p, h)
}
}
go func() { plog.Fatal(http.ListenAndServe(":9028", nil)) }()
@ -176,12 +174,10 @@ func portsFromArg(arg string, n, defaultPort int) []int {
return ret
}
func makeFailures(types string, c *cluster) []failure {
func makeFailures(types, failpoints []string, c *cluster) []failure {
var failures []failure
fails := strings.Split(types, ",")
for i := range fails {
switch fails[i] {
for i := range types {
switch types[i] {
case "default":
defaultFailures := []failure{
newFailureKillAll(),
@ -199,14 +195,14 @@ func makeFailures(types string, c *cluster) []failure {
failures = append(failures, defaultFailures...)
case "failpoints":
fpFailures, fperr := failpointFailures(c)
fpFailures, fperr := failpointFailures(c, failpoints)
if len(fpFailures) == 0 {
plog.Infof("no failpoints found (%v)", fperr)
}
failures = append(failures, fpFailures...)
default:
plog.Errorf("unknown failure: %s\n", fails[i])
plog.Errorf("unknown failure: %s\n", types[i])
os.Exit(1)
}
}

View File

@ -79,7 +79,10 @@ func (m *member) Defrag() error {
return err
}
defer cli.Close()
if _, err = cli.Defragment(context.Background(), m.ClientURL); err != nil {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
_, err = cli.Defragment(ctx, m.ClientURL)
cancel()
if err != nil {
return err
}
plog.Printf("defragmented %s\n", m.ClientURL)

View File

@ -15,21 +15,21 @@
package main
import (
"fmt"
"strings"
"sync"
"time"
"golang.org/x/time/rate"
"google.golang.org/grpc/grpclog"
)
func init() { grpclog.SetLogger(plog) }
type Stresser interface {
// Stress starts to stress the etcd cluster
Stress() error
// Cancel cancels the stress test on the etcd cluster
Cancel()
// Pause stops the stresser from sending requests to etcd. Resume by calling Stress.
Pause()
// Close releases all of the Stresser's resources.
Close()
// ModifiedKeys reports the number of keys created and deleted by stresser
ModifiedKeys() int64
// Checker returns an invariant checker for after the stresser is canceled.
@ -43,7 +43,8 @@ type nopStresser struct {
}
func (s *nopStresser) Stress() error { return nil }
func (s *nopStresser) Cancel() {}
func (s *nopStresser) Pause() {}
func (s *nopStresser) Close() {}
func (s *nopStresser) ModifiedKeys() int64 {
return 0
}
@ -59,7 +60,7 @@ func (cs *compositeStresser) Stress() error {
for i, s := range cs.stressers {
if err := s.Stress(); err != nil {
for j := 0; j < i; j++ {
cs.stressers[i].Cancel()
cs.stressers[i].Close()
}
return err
}
@ -67,13 +68,25 @@ func (cs *compositeStresser) Stress() error {
return nil
}
func (cs *compositeStresser) Cancel() {
func (cs *compositeStresser) Pause() {
var wg sync.WaitGroup
wg.Add(len(cs.stressers))
for i := range cs.stressers {
go func(s Stresser) {
defer wg.Done()
s.Cancel()
s.Pause()
}(cs.stressers[i])
}
wg.Wait()
}
func (cs *compositeStresser) Close() {
var wg sync.WaitGroup
wg.Add(len(cs.stressers))
for i := range cs.stressers {
go func(s Stresser) {
defer wg.Done()
s.Close()
}(cs.stressers[i])
}
wg.Wait()
@ -108,6 +121,8 @@ type stressConfig struct {
keysPerLease int
rateLimiter *rate.Limiter
etcdRunnerPath string
}
// NewStresser creates stresser from a comma separated list of stresser types.
@ -149,6 +164,49 @@ func NewStresser(s string, sc *stressConfig, m *member) Stresser {
keysPerLease: sc.keysPerLease,
rateLimiter: sc.rateLimiter,
}
case "election-runner":
reqRate := 100
args := []string{
"election",
fmt.Sprintf("%v", time.Now().UnixNano()), // election name as current nano time
"--dial-timeout=10s",
"--endpoints", m.grpcAddr(),
"--total-client-connections=10",
"--rounds=0", // runs forever
"--req-rate", fmt.Sprintf("%v", reqRate),
}
return newRunnerStresser(sc.etcdRunnerPath, args, sc.rateLimiter, reqRate)
case "watch-runner":
reqRate := 100
args := []string{
"watcher",
"--prefix", fmt.Sprintf("%v", time.Now().UnixNano()), // prefix all keys with nano time
"--total-keys=1",
"--total-prefixes=1",
"--watch-per-prefix=1",
"--endpoints", m.grpcAddr(),
"--rounds=0", // runs forever
"--req-rate", fmt.Sprintf("%v", reqRate),
}
return newRunnerStresser(sc.etcdRunnerPath, args, sc.rateLimiter, reqRate)
case "lock-racer-runner":
reqRate := 100
args := []string{
"lock-racer",
fmt.Sprintf("%v", time.Now().UnixNano()), // locker name as current nano time
"--endpoints", m.grpcAddr(),
"--total-client-connections=10",
"--rounds=0", // runs forever
"--req-rate", fmt.Sprintf("%v", reqRate),
}
return newRunnerStresser(sc.etcdRunnerPath, args, sc.rateLimiter, reqRate)
case "lease-runner":
args := []string{
"lease-renewer",
"--ttl=30",
"--endpoints", m.grpcAddr(),
}
return newRunnerStresser(sc.etcdRunnerPath, args, sc.rateLimiter, 0)
default:
plog.Panicf("unknown stresser type: %s\n", s)
}

View File

@ -16,12 +16,14 @@ package main
import (
"fmt"
"os"
"time"
)
type tester struct {
cluster *cluster
limit int
exitOnFailure bool
failures []failure
status Status
@ -49,6 +51,7 @@ func (tt *tester) runLoop() {
if err := tt.resetStressCheck(); err != nil {
plog.Errorf("%s failed to start stresser (%v)", tt.logPrefix(), err)
tt.failed()
return
}
@ -87,6 +90,7 @@ func (tt *tester) runLoop() {
if round > 0 && round%500 == 0 { // every 500 rounds
if err := tt.defrag(); err != nil {
plog.Warningf("%s functional-tester returning with error (%v)", tt.logPrefix(), err)
tt.failed()
return
}
}
@ -114,7 +118,7 @@ func (tt *tester) doRound(round int) error {
return fmt.Errorf("recovery error: %v", err)
}
plog.Infof("%s recovered failure", tt.logPrefix())
tt.cancelStresser()
tt.pauseStresser()
plog.Infof("%s wait until cluster is healthy", tt.logPrefix())
if err := tt.cluster.WaitHealth(); err != nil {
return fmt.Errorf("wait full health error: %v", err)
@ -161,7 +165,7 @@ func (tt *tester) checkConsistency() (err error) {
}
func (tt *tester) compact(rev int64, timeout time.Duration) (err error) {
tt.cancelStresser()
tt.pauseStresser()
defer func() {
if err == nil {
err = tt.startStresser()
@ -209,7 +213,18 @@ func (tt *tester) logPrefix() string {
return prefix
}
func (tt *tester) failed() {
if !tt.exitOnFailure {
return
}
plog.Warningf("%s exiting on failure", tt.logPrefix())
tt.cluster.Terminate()
os.Exit(2)
}
func (tt *tester) cleanup() error {
defer tt.failed()
roundFailedTotalCounter.Inc()
desc := "compact/defrag"
if tt.status.Case != -1 {
@ -217,7 +232,7 @@ func (tt *tester) cleanup() error {
}
caseFailedTotalCounter.WithLabelValues(desc).Inc()
tt.cancelStresser()
tt.closeStresser()
if err := tt.cluster.Cleanup(); err != nil {
plog.Warningf("%s cleanup error: %v", tt.logPrefix(), err)
return err
@ -229,10 +244,10 @@ func (tt *tester) cleanup() error {
return tt.resetStressCheck()
}
func (tt *tester) cancelStresser() {
plog.Infof("%s canceling the stressers...", tt.logPrefix())
tt.stresser.Cancel()
plog.Infof("%s canceled stressers", tt.logPrefix())
func (tt *tester) pauseStresser() {
plog.Infof("%s pausing the stressers...", tt.logPrefix())
tt.stresser.Pause()
plog.Infof("%s paused stressers", tt.logPrefix())
}
func (tt *tester) startStresser() (err error) {
@ -242,6 +257,12 @@ func (tt *tester) startStresser() (err error) {
return err
}
func (tt *tester) closeStresser() {
plog.Infof("%s closing the stressers...", tt.logPrefix())
tt.stresser.Close()
plog.Infof("%s closed stressers", tt.logPrefix())
}
func (tt *tester) resetStressCheck() error {
plog.Infof("%s resetting stressers and checkers...", tt.logPrefix())
cs := &compositeStresser{}

View File

@ -93,11 +93,15 @@ func (s *v2Stresser) run(ctx context.Context, kv clientV2.KeysAPI) {
}
}
func (s *v2Stresser) Cancel() {
func (s *v2Stresser) Pause() {
s.cancel()
s.wg.Wait()
}
func (s *v2Stresser) Close() {
s.Pause()
}
func (s *v2Stresser) ModifiedKeys() int64 {
return atomic.LoadInt64(&s.atomicModifiedKey)
}