diff --git a/tools/functional-tester/etcd-tester/checks.go b/tools/functional-tester/etcd-tester/checks.go deleted file mode 100644 index 81a659ac2..000000000 --- a/tools/functional-tester/etcd-tester/checks.go +++ /dev/null @@ -1,264 +0,0 @@ -// Copyright 2016 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package main - -import ( - "context" - "fmt" - "time" - - "github.com/coreos/etcd/clientv3" - "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" - pb "github.com/coreos/etcd/etcdserver/etcdserverpb" - - "google.golang.org/grpc" -) - -const ( - retries = 7 -) - -type Checker interface { - // Check returns an error if the system fails a consistency check. - Check() error -} - -type hashAndRevGetter interface { - getRevisionHash() (revs map[string]int64, hashes map[string]int64, err error) -} - -type hashChecker struct { - hrg hashAndRevGetter -} - -func newHashChecker(hrg hashAndRevGetter) Checker { return &hashChecker{hrg} } - -const leaseCheckerTimeout = 10 * time.Second - -func (hc *hashChecker) checkRevAndHashes() (err error) { - var ( - revs map[string]int64 - hashes map[string]int64 - ) - - // retries in case of transient failure or etcd cluster has not stablized yet. - for i := 0; i < retries; i++ { - revs, hashes, err = hc.hrg.getRevisionHash() - if err != nil { - plog.Warningf("retry %d. failed to retrieve revison and hash (%v)", i, err) - } else { - sameRev := getSameValue(revs) - sameHashes := getSameValue(hashes) - if sameRev && sameHashes { - return nil - } - plog.Warningf("retry %d. etcd cluster is not stable: [revisions: %v] and [hashes: %v]", i, revs, hashes) - } - time.Sleep(time.Second) - } - - if err != nil { - return fmt.Errorf("failed revision and hash check (%v)", err) - } - - return fmt.Errorf("etcd cluster is not stable: [revisions: %v] and [hashes: %v]", revs, hashes) -} - -func (hc *hashChecker) Check() error { - return hc.checkRevAndHashes() -} - -type leaseChecker struct { - endpoint string - ls *leaseStresser - leaseClient pb.LeaseClient - kvc pb.KVClient -} - -func (lc *leaseChecker) Check() error { - conn, err := grpc.Dial(lc.ls.endpoint, grpc.WithInsecure(), grpc.WithBackoffMaxDelay(1)) - if err != nil { - return fmt.Errorf("%v (%s)", err, lc.ls.endpoint) - } - defer func() { - if conn != nil { - conn.Close() - } - }() - lc.kvc = pb.NewKVClient(conn) - lc.leaseClient = pb.NewLeaseClient(conn) - if err := lc.check(true, lc.ls.revokedLeases.leases); err != nil { - return err - } - if err := lc.check(false, lc.ls.aliveLeases.leases); err != nil { - return err - } - return lc.checkShortLivedLeases() -} - -// checkShortLivedLeases ensures leases expire. -func (lc *leaseChecker) checkShortLivedLeases() error { - ctx, cancel := context.WithTimeout(context.Background(), leaseCheckerTimeout) - errc := make(chan error) - defer cancel() - for leaseID := range lc.ls.shortLivedLeases.leases { - go func(id int64) { - errc <- lc.checkShortLivedLease(ctx, id) - }(leaseID) - } - - var errs []error - for range lc.ls.shortLivedLeases.leases { - if err := <-errc; err != nil { - errs = append(errs, err) - } - } - return errsToError(errs) -} - -func (lc *leaseChecker) checkShortLivedLease(ctx context.Context, leaseID int64) (err error) { - // retry in case of transient failure or lease is expired but not yet revoked due to the fact that etcd cluster didn't have enought time to delete it. - var resp *pb.LeaseTimeToLiveResponse - for i := 0; i < retries; i++ { - resp, err = lc.getLeaseByID(ctx, leaseID) - // lease not found, for ~v3.1 compatibilities, check ErrLeaseNotFound - if (err == nil && resp.TTL == -1) || (err != nil && rpctypes.Error(err) == rpctypes.ErrLeaseNotFound) { - return nil - } - if err != nil { - plog.Debugf("retry %d. failed to retrieve lease %v error (%v)", i, leaseID, err) - continue - } - if resp.TTL > 0 { - plog.Debugf("lease %v is not expired. sleep for %d until it expires.", leaseID, resp.TTL) - time.Sleep(time.Duration(resp.TTL) * time.Second) - } else { - plog.Debugf("retry %d. lease %v is expired but not yet revoked", i, leaseID) - time.Sleep(time.Second) - } - if err = lc.checkLease(ctx, false, leaseID); err != nil { - continue - } - return nil - } - return err -} - -func (lc *leaseChecker) checkLease(ctx context.Context, expired bool, leaseID int64) error { - keysExpired, err := lc.hasKeysAttachedToLeaseExpired(ctx, leaseID) - if err != nil { - plog.Errorf("hasKeysAttachedToLeaseExpired error %v (endpoint %q)", err, lc.endpoint) - return err - } - leaseExpired, err := lc.hasLeaseExpired(ctx, leaseID) - if err != nil { - plog.Errorf("hasLeaseExpired error %v (endpoint %q)", err, lc.endpoint) - return err - } - if leaseExpired != keysExpired { - return fmt.Errorf("lease %v expiration mismatch (lease expired=%v, keys expired=%v)", leaseID, leaseExpired, keysExpired) - } - if leaseExpired != expired { - return fmt.Errorf("lease %v expected expired=%v, got %v", leaseID, expired, leaseExpired) - } - return nil -} - -func (lc *leaseChecker) check(expired bool, leases map[int64]time.Time) error { - ctx, cancel := context.WithTimeout(context.Background(), leaseCheckerTimeout) - defer cancel() - for leaseID := range leases { - if err := lc.checkLease(ctx, expired, leaseID); err != nil { - return err - } - } - return nil -} - -func (lc *leaseChecker) getLeaseByID(ctx context.Context, leaseID int64) (*pb.LeaseTimeToLiveResponse, error) { - ltl := &pb.LeaseTimeToLiveRequest{ID: leaseID, Keys: true} - return lc.leaseClient.LeaseTimeToLive(ctx, ltl, grpc.FailFast(false)) -} - -func (lc *leaseChecker) hasLeaseExpired(ctx context.Context, leaseID int64) (bool, error) { - // keep retrying until lease's state is known or ctx is being canceled - for ctx.Err() == nil { - resp, err := lc.getLeaseByID(ctx, leaseID) - if err != nil { - // for ~v3.1 compatibilities - if rpctypes.Error(err) == rpctypes.ErrLeaseNotFound { - return true, nil - } - } else { - return resp.TTL == -1, nil - } - plog.Warningf("hasLeaseExpired %v resp %v error %v (endpoint %q)", leaseID, resp, err, lc.endpoint) - } - return false, ctx.Err() -} - -// The keys attached to the lease has the format of "_" where idx is the ordering key creation -// Since the format of keys contains about leaseID, finding keys base on "" prefix -// determines whether the attached keys for a given leaseID has been deleted or not -func (lc *leaseChecker) hasKeysAttachedToLeaseExpired(ctx context.Context, leaseID int64) (bool, error) { - resp, err := lc.kvc.Range(ctx, &pb.RangeRequest{ - Key: []byte(fmt.Sprintf("%d", leaseID)), - RangeEnd: []byte(clientv3.GetPrefixRangeEnd(fmt.Sprintf("%d", leaseID))), - }, grpc.FailFast(false)) - if err != nil { - plog.Errorf("retrieving keys attached to lease %v error %v (endpoint %q)", leaseID, err, lc.endpoint) - return false, err - } - return len(resp.Kvs) == 0, nil -} - -// compositeChecker implements a checker that runs a slice of Checkers concurrently. -type compositeChecker struct{ checkers []Checker } - -func newCompositeChecker(checkers []Checker) Checker { - return &compositeChecker{checkers} -} - -func (cchecker *compositeChecker) Check() error { - errc := make(chan error) - for _, c := range cchecker.checkers { - go func(chk Checker) { errc <- chk.Check() }(c) - } - var errs []error - for range cchecker.checkers { - if err := <-errc; err != nil { - errs = append(errs, err) - } - } - return errsToError(errs) -} - -type runnerChecker struct { - errc chan error -} - -func (rc *runnerChecker) Check() error { - select { - case err := <-rc.errc: - return err - default: - return nil - } -} - -type noChecker struct{} - -func newNoChecker() Checker { return &noChecker{} } -func (nc *noChecker) Check() error { return nil } diff --git a/tools/functional-tester/etcd-tester/cluster.go b/tools/functional-tester/etcd-tester/cluster.go deleted file mode 100644 index 92cbd418d..000000000 --- a/tools/functional-tester/etcd-tester/cluster.go +++ /dev/null @@ -1,261 +0,0 @@ -// Copyright 2015 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package main - -import ( - "context" - "fmt" - "math/rand" - "net" - "strings" - "time" - - pb "github.com/coreos/etcd/etcdserver/etcdserverpb" - "github.com/coreos/etcd/tools/functional-tester/etcd-agent/client" - - "google.golang.org/grpc" -) - -// agentConfig holds information needed to interact/configure an agent and its etcd process -type agentConfig struct { - endpoint string - clientPort int - advertiseClientPort int - peerPort int - advertisePeerPort int - failpointPort int -} - -type cluster struct { - agents []agentConfig - Size int - Members []*member -} - -type ClusterStatus struct { - AgentStatuses map[string]client.Status -} - -func (c *cluster) bootstrap() error { - size := len(c.agents) - - members := make([]*member, size) - memberNameURLs := make([]string, size) - for i, a := range c.agents { - agent, err := client.NewAgent(a.endpoint) - if err != nil { - return err - } - host, _, err := net.SplitHostPort(a.endpoint) - if err != nil { - return err - } - members[i] = &member{ - Agent: agent, - Endpoint: a.endpoint, - Name: fmt.Sprintf("etcd-%d", i), - ClientURL: fmt.Sprintf("http://%s:%d", host, a.clientPort), - AdvertiseClientURL: fmt.Sprintf("http://%s:%d", host, a.advertiseClientPort), - PeerURL: fmt.Sprintf("http://%s:%d", host, a.peerPort), - AdvertisePeerURL: fmt.Sprintf("http://%s:%d", host, a.advertisePeerPort), - FailpointURL: fmt.Sprintf("http://%s:%d", host, a.failpointPort), - } - memberNameURLs[i] = members[i].ClusterEntry() - } - clusterStr := strings.Join(memberNameURLs, ",") - token := fmt.Sprint(rand.Int()) - - for i, m := range members { - flags := append( - m.Flags(), - "--initial-cluster-token", token, - "--initial-cluster", clusterStr, - ) - - if _, err := m.Agent.Start(flags...); err != nil { - // cleanup - for _, m := range members[:i] { - m.Agent.Terminate() - } - return err - } - } - - c.Size = size - c.Members = members - return nil -} - -func (c *cluster) Reset() error { return c.bootstrap() } - -func (c *cluster) WaitHealth() error { - var err error - // wait 60s to check cluster health. - // TODO: set it to a reasonable value. It is set that high because - // follower may use long time to catch up the leader when reboot under - // reasonable workload (https://github.com/coreos/etcd/issues/2698) - for i := 0; i < 60; i++ { - for _, m := range c.Members { - if err = m.SetHealthKeyV3(); err != nil { - break - } - } - if err == nil { - return nil - } - plog.Warningf("#%d setHealthKey error (%v)", i, err) - time.Sleep(time.Second) - } - return err -} - -// GetLeader returns the index of leader and error if any. -func (c *cluster) GetLeader() (int, error) { - for i, m := range c.Members { - isLeader, err := m.IsLeader() - if isLeader || err != nil { - return i, err - } - } - return 0, fmt.Errorf("no leader found") -} - -func (c *cluster) Cleanup() error { - var lasterr error - for _, m := range c.Members { - if err := m.Agent.Cleanup(); err != nil { - lasterr = err - } - } - return lasterr -} - -func (c *cluster) Terminate() { - for _, m := range c.Members { - m.Agent.Terminate() - } -} - -func (c *cluster) Status() ClusterStatus { - cs := ClusterStatus{ - AgentStatuses: make(map[string]client.Status), - } - - for _, m := range c.Members { - s, err := m.Agent.Status() - // TODO: add a.Desc() as a key of the map - desc := m.Endpoint - if err != nil { - cs.AgentStatuses[desc] = client.Status{State: "unknown"} - plog.Printf("failed to get the status of agent [%s]", desc) - } - cs.AgentStatuses[desc] = s - } - return cs -} - -// maxRev returns the maximum revision found on the cluster. -func (c *cluster) maxRev() (rev int64, err error) { - ctx, cancel := context.WithTimeout(context.TODO(), time.Second) - defer cancel() - revc, errc := make(chan int64, len(c.Members)), make(chan error, len(c.Members)) - for i := range c.Members { - go func(m *member) { - mrev, merr := m.Rev(ctx) - revc <- mrev - errc <- merr - }(c.Members[i]) - } - for i := 0; i < len(c.Members); i++ { - if merr := <-errc; merr != nil { - err = merr - } - if mrev := <-revc; mrev > rev { - rev = mrev - } - } - return rev, err -} - -func (c *cluster) getRevisionHash() (map[string]int64, map[string]int64, error) { - revs := make(map[string]int64) - hashes := make(map[string]int64) - for _, m := range c.Members { - rev, hash, err := m.RevHash() - if err != nil { - return nil, nil, err - } - revs[m.ClientURL] = rev - hashes[m.ClientURL] = hash - } - return revs, hashes, nil -} - -func (c *cluster) compactKV(rev int64, timeout time.Duration) (err error) { - if rev <= 0 { - return nil - } - - for i, m := range c.Members { - u := m.ClientURL - conn, derr := m.dialGRPC() - if derr != nil { - plog.Printf("[compact kv #%d] dial error %v (endpoint %s)", i, derr, u) - err = derr - continue - } - kvc := pb.NewKVClient(conn) - ctx, cancel := context.WithTimeout(context.Background(), timeout) - plog.Printf("[compact kv #%d] starting (endpoint %s)", i, u) - _, cerr := kvc.Compact(ctx, &pb.CompactionRequest{Revision: rev, Physical: true}, grpc.FailFast(false)) - cancel() - conn.Close() - succeed := true - if cerr != nil { - if strings.Contains(cerr.Error(), "required revision has been compacted") && i > 0 { - plog.Printf("[compact kv #%d] already compacted (endpoint %s)", i, u) - } else { - plog.Warningf("[compact kv #%d] error %v (endpoint %s)", i, cerr, u) - err = cerr - succeed = false - } - } - if succeed { - plog.Printf("[compact kv #%d] done (endpoint %s)", i, u) - } - } - return err -} - -func (c *cluster) checkCompact(rev int64) error { - if rev == 0 { - return nil - } - for _, m := range c.Members { - if err := m.CheckCompact(rev); err != nil { - return err - } - } - return nil -} - -func (c *cluster) defrag() error { - for _, m := range c.Members { - if err := m.Defrag(); err != nil { - return err - } - } - return nil -} diff --git a/tools/functional-tester/etcd-tester/doc.go b/tools/functional-tester/etcd-tester/doc.go deleted file mode 100644 index 1cf0eca00..000000000 --- a/tools/functional-tester/etcd-tester/doc.go +++ /dev/null @@ -1,16 +0,0 @@ -// Copyright 2016 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// etcd-tester is a single controller for all etcd-agents to manage an etcd cluster and simulate failures. -package main diff --git a/tools/functional-tester/etcd-tester/etcd_runner_stresser.go b/tools/functional-tester/etcd-tester/etcd_runner_stresser.go deleted file mode 100644 index 23636bf5a..000000000 --- a/tools/functional-tester/etcd-tester/etcd_runner_stresser.go +++ /dev/null @@ -1,97 +0,0 @@ -// Copyright 2017 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package main - -import ( - "fmt" - "io/ioutil" - "os/exec" - "syscall" - - "golang.org/x/time/rate" -) - -type runnerStresser struct { - cmd *exec.Cmd - cmdStr string - args []string - rl *rate.Limiter - reqRate int - - errc chan error - donec chan struct{} -} - -func newRunnerStresser(cmdStr string, args []string, rl *rate.Limiter, reqRate int) *runnerStresser { - rl.SetLimit(rl.Limit() - rate.Limit(reqRate)) - return &runnerStresser{ - cmdStr: cmdStr, - args: args, - rl: rl, - reqRate: reqRate, - errc: make(chan error, 1), - donec: make(chan struct{}), - } -} - -func (rs *runnerStresser) setupOnce() (err error) { - if rs.cmd != nil { - return nil - } - - rs.cmd = exec.Command(rs.cmdStr, rs.args...) - stderr, err := rs.cmd.StderrPipe() - if err != nil { - return err - } - - go func() { - defer close(rs.donec) - out, err := ioutil.ReadAll(stderr) - if err != nil { - rs.errc <- err - } else { - rs.errc <- fmt.Errorf("(%v %v) stderr %v", rs.cmdStr, rs.args, string(out)) - } - }() - - return rs.cmd.Start() -} - -func (rs *runnerStresser) Stress() (err error) { - if err = rs.setupOnce(); err != nil { - return err - } - return syscall.Kill(rs.cmd.Process.Pid, syscall.SIGCONT) -} - -func (rs *runnerStresser) Pause() { - syscall.Kill(rs.cmd.Process.Pid, syscall.SIGSTOP) -} - -func (rs *runnerStresser) Close() { - syscall.Kill(rs.cmd.Process.Pid, syscall.SIGINT) - rs.cmd.Wait() - <-rs.donec - rs.rl.SetLimit(rs.rl.Limit() + rate.Limit(rs.reqRate)) -} - -func (rs *runnerStresser) ModifiedKeys() int64 { - return 1 -} - -func (rs *runnerStresser) Checker() Checker { - return &runnerChecker{rs.errc} -} diff --git a/tools/functional-tester/etcd-tester/failpoint.go b/tools/functional-tester/etcd-tester/failpoint.go deleted file mode 100644 index bfb937436..000000000 --- a/tools/functional-tester/etcd-tester/failpoint.go +++ /dev/null @@ -1,160 +0,0 @@ -// Copyright 2016 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package main - -import ( - "fmt" - "io/ioutil" - "net/http" - "strings" - "sync" - "time" -) - -type failpointStats struct { - // crashes counts the number of crashes for a failpoint - crashes map[string]int - // mu protects crashes - mu sync.Mutex -} - -var fpStats failpointStats - -func failpointFailures(c *cluster, failpoints []string) (ret []failure, err error) { - var fps []string - fps, err = failpointPaths(c.Members[0].FailpointURL) - if err != nil { - return nil, err - } - // create failure objects for all failpoints - for _, fp := range fps { - if len(fp) == 0 { - continue - } - fpFails := failuresFromFailpoint(fp, failpoints) - // wrap in delays so failpoint has time to trigger - for i, fpf := range fpFails { - if strings.Contains(fp, "Snap") { - // hack to trigger snapshot failpoints - fpFails[i] = &failureUntilSnapshot{fpf} - } else { - fpFails[i] = &failureDelay{fpf, 3 * time.Second} - } - } - ret = append(ret, fpFails...) - } - fpStats.crashes = make(map[string]int) - return ret, err -} - -func failpointPaths(endpoint string) ([]string, error) { - resp, err := http.Get(endpoint) - if err != nil { - return nil, err - } - defer resp.Body.Close() - body, rerr := ioutil.ReadAll(resp.Body) - if rerr != nil { - return nil, rerr - } - var fps []string - for _, l := range strings.Split(string(body), "\n") { - fp := strings.Split(l, "=")[0] - fps = append(fps, fp) - } - return fps, nil -} - -// failpoints follows FreeBSD KFAIL_POINT syntax. -// e.g. panic("etcd-tester"),1*sleep(1000)->panic("etcd-tester") -func failuresFromFailpoint(fp string, failpoints []string) (fs []failure) { - recov := makeRecoverFailpoint(fp) - for _, failpoint := range failpoints { - inject := makeInjectFailpoint(fp, failpoint) - fs = append(fs, []failure{ - &failureOne{ - description: description(fmt.Sprintf("failpoint %s (one: %s)", fp, failpoint)), - injectMember: inject, - recoverMember: recov, - }, - &failureAll{ - description: description(fmt.Sprintf("failpoint %s (all: %s)", fp, failpoint)), - injectMember: inject, - recoverMember: recov, - }, - &failureMajority{ - description: description(fmt.Sprintf("failpoint %s (majority: %s)", fp, failpoint)), - injectMember: inject, - recoverMember: recov, - }, - &failureLeader{ - failureByFunc{ - description: description(fmt.Sprintf("failpoint %s (leader: %s)", fp, failpoint)), - injectMember: inject, - recoverMember: recov, - }, - 0, - }, - }...) - } - return fs -} - -func makeInjectFailpoint(fp, val string) injectMemberFunc { - return func(m *member) (err error) { - return putFailpoint(m.FailpointURL, fp, val) - } -} - -func makeRecoverFailpoint(fp string) recoverMemberFunc { - return func(m *member) error { - if err := delFailpoint(m.FailpointURL, fp); err == nil { - return nil - } - // node not responding, likely dead from fp panic; restart - fpStats.mu.Lock() - fpStats.crashes[fp]++ - fpStats.mu.Unlock() - return recoverStop(m) - } -} - -func putFailpoint(ep, fp, val string) error { - req, _ := http.NewRequest(http.MethodPut, ep+"/"+fp, strings.NewReader(val)) - c := http.Client{} - resp, err := c.Do(req) - if err != nil { - return err - } - resp.Body.Close() - if resp.StatusCode/100 != 2 { - return fmt.Errorf("failed to PUT %s=%s at %s (%v)", fp, val, ep, resp.Status) - } - return nil -} - -func delFailpoint(ep, fp string) error { - req, _ := http.NewRequest(http.MethodDelete, ep+"/"+fp, strings.NewReader("")) - c := http.Client{} - resp, err := c.Do(req) - if err != nil { - return err - } - resp.Body.Close() - if resp.StatusCode/100 != 2 { - return fmt.Errorf("failed to DELETE %s at %s (%v)", fp, ep, resp.Status) - } - return nil -} diff --git a/tools/functional-tester/etcd-tester/failure.go b/tools/functional-tester/etcd-tester/failure.go deleted file mode 100644 index 098c0e839..000000000 --- a/tools/functional-tester/etcd-tester/failure.go +++ /dev/null @@ -1,205 +0,0 @@ -// Copyright 2015 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package main - -import ( - "fmt" - "math/rand" - "os/exec" - "time" -) - -type failure interface { - // Inject injeccts the failure into the testing cluster at the given - // round. When calling the function, the cluster should be in health. - Inject(c *cluster, round int) error - // Recover recovers the injected failure caused by the injection of the - // given round and wait for the recovery of the testing cluster. - Recover(c *cluster, round int) error - // Desc returns a description of the failure - Desc() string -} - -type description string - -func (d description) Desc() string { return string(d) } - -type injectMemberFunc func(*member) error -type recoverMemberFunc func(*member) error - -type failureByFunc struct { - description - injectMember injectMemberFunc - recoverMember recoverMemberFunc -} - -type failureOne failureByFunc -type failureAll failureByFunc -type failureMajority failureByFunc -type failureLeader struct { - failureByFunc - idx int -} - -type failureDelay struct { - failure - delayDuration time.Duration -} - -// failureUntilSnapshot injects a failure and waits for a snapshot event -type failureUntilSnapshot struct{ failure } - -func (f *failureOne) Inject(c *cluster, round int) error { - return f.injectMember(c.Members[round%c.Size]) -} - -func (f *failureOne) Recover(c *cluster, round int) error { - if err := f.recoverMember(c.Members[round%c.Size]); err != nil { - return err - } - return c.WaitHealth() -} - -func (f *failureAll) Inject(c *cluster, round int) error { - for _, m := range c.Members { - if err := f.injectMember(m); err != nil { - return err - } - } - return nil -} - -func (f *failureAll) Recover(c *cluster, round int) error { - for _, m := range c.Members { - if err := f.recoverMember(m); err != nil { - return err - } - } - return c.WaitHealth() -} - -func (f *failureMajority) Inject(c *cluster, round int) error { - for i := range killMap(c.Size, round) { - if err := f.injectMember(c.Members[i]); err != nil { - return err - } - } - return nil -} - -func (f *failureMajority) Recover(c *cluster, round int) error { - for i := range killMap(c.Size, round) { - if err := f.recoverMember(c.Members[i]); err != nil { - return err - } - } - return nil -} - -func (f *failureLeader) Inject(c *cluster, round int) error { - idx, err := c.GetLeader() - if err != nil { - return err - } - f.idx = idx - return f.injectMember(c.Members[idx]) -} - -func (f *failureLeader) Recover(c *cluster, round int) error { - if err := f.recoverMember(c.Members[f.idx]); err != nil { - return err - } - return c.WaitHealth() -} - -func (f *failureDelay) Inject(c *cluster, round int) error { - if err := f.failure.Inject(c, round); err != nil { - return err - } - if f.delayDuration > 0 { - plog.Infof("sleeping delay duration %v for %q", f.delayDuration, f.failure.Desc()) - time.Sleep(f.delayDuration) - } - return nil -} - -func (f *failureUntilSnapshot) Inject(c *cluster, round int) error { - if err := f.failure.Inject(c, round); err != nil { - return err - } - if c.Size < 3 { - return nil - } - // maxRev may fail since failure just injected, retry if failed. - startRev, err := c.maxRev() - for i := 0; i < 10 && startRev == 0; i++ { - startRev, err = c.maxRev() - } - if startRev == 0 { - return err - } - lastRev := startRev - // Normal healthy cluster could accept 1000req/s at least. - // Give it 3-times time to create a new snapshot. - retry := snapshotCount / 1000 * 3 - for j := 0; j < retry; j++ { - lastRev, _ = c.maxRev() - // If the number of proposals committed is bigger than snapshot count, - // a new snapshot should have been created. - if lastRev-startRev > snapshotCount { - return nil - } - time.Sleep(time.Second) - } - return fmt.Errorf("cluster too slow: only commit %d requests in %ds", lastRev-startRev, retry) -} - -func (f *failureUntilSnapshot) Desc() string { - return f.failure.Desc() + " for a long time and expect it to recover from an incoming snapshot" -} - -func killMap(size int, seed int) map[int]bool { - m := make(map[int]bool) - r := rand.New(rand.NewSource(int64(seed))) - majority := size/2 + 1 - for { - m[r.Intn(size)] = true - if len(m) >= majority { - return m - } - } -} - -type failureNop failureByFunc - -func (f *failureNop) Inject(c *cluster, round int) error { return nil } -func (f *failureNop) Recover(c *cluster, round int) error { return nil } - -type failureExternal struct { - failure - - description string - scriptPath string -} - -func (f *failureExternal) Inject(c *cluster, round int) error { - return exec.Command(f.scriptPath, "enable", fmt.Sprintf("%d", round)).Run() -} - -func (f *failureExternal) Recover(c *cluster, round int) error { - return exec.Command(f.scriptPath, "disable", fmt.Sprintf("%d", round)).Run() -} - -func (f *failureExternal) Desc() string { return f.description } diff --git a/tools/functional-tester/etcd-tester/failure_agent.go b/tools/functional-tester/etcd-tester/failure_agent.go deleted file mode 100644 index 49dff8ccd..000000000 --- a/tools/functional-tester/etcd-tester/failure_agent.go +++ /dev/null @@ -1,177 +0,0 @@ -// Copyright 2016 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package main - -import ( - "fmt" - "time" -) - -const ( - snapshotCount = 10000 - slowNetworkLatency = 500 // 500 millisecond - randomVariation = 50 - - // delay duration to trigger leader election (default election timeout 1s) - triggerElectionDur = 5 * time.Second - - // Wait more when it recovers from slow network, because network layer - // needs extra time to propagate traffic control (tc command) change. - // Otherwise, we get different hash values from the previous revision. - // For more detail, please see https://github.com/coreos/etcd/issues/5121. - waitRecover = 5 * time.Second -) - -func injectStop(m *member) error { return m.Agent.Stop() } -func recoverStop(m *member) error { - _, err := m.Agent.Restart() - return err -} - -func newFailureKillAll() failure { - return &failureAll{ - description: "kill all members", - injectMember: injectStop, - recoverMember: recoverStop, - } -} - -func newFailureKillMajority() failure { - return &failureMajority{ - description: "kill majority of the cluster", - injectMember: injectStop, - recoverMember: recoverStop, - } -} - -func newFailureKillOne() failure { - return &failureOne{ - description: "kill one random member", - injectMember: injectStop, - recoverMember: recoverStop, - } -} - -func newFailureKillLeader() failure { - ff := failureByFunc{ - description: "kill leader member", - injectMember: injectStop, - recoverMember: recoverStop, - } - return &failureLeader{ff, 0} -} - -func newFailureKillOneForLongTime() failure { - return &failureUntilSnapshot{newFailureKillOne()} -} - -func newFailureKillLeaderForLongTime() failure { - return &failureUntilSnapshot{newFailureKillLeader()} -} - -func injectDropPort(m *member) error { return m.Agent.DropPort(m.peerPort()) } -func recoverDropPort(m *member) error { return m.Agent.RecoverPort(m.peerPort()) } - -func newFailureIsolate() failure { - f := &failureOne{ - description: "isolate one member", - injectMember: injectDropPort, - recoverMember: recoverDropPort, - } - return &failureDelay{ - failure: f, - delayDuration: triggerElectionDur, - } -} - -func newFailureIsolateAll() failure { - f := &failureAll{ - description: "isolate all members", - injectMember: injectDropPort, - recoverMember: recoverDropPort, - } - return &failureDelay{ - failure: f, - delayDuration: triggerElectionDur, - } -} - -func injectLatency(m *member) error { - if err := m.Agent.SetLatency(slowNetworkLatency, randomVariation); err != nil { - m.Agent.RemoveLatency() - return err - } - return nil -} - -func recoverLatency(m *member) error { - if err := m.Agent.RemoveLatency(); err != nil { - return err - } - time.Sleep(waitRecover) - return nil -} - -func newFailureSlowNetworkOneMember() failure { - desc := fmt.Sprintf("slow down one member's network by adding %d ms latency", slowNetworkLatency) - f := &failureOne{ - description: description(desc), - injectMember: injectLatency, - recoverMember: recoverLatency, - } - return &failureDelay{ - failure: f, - delayDuration: triggerElectionDur, - } -} - -func newFailureSlowNetworkLeader() failure { - desc := fmt.Sprintf("slow down leader's network by adding %d ms latency", slowNetworkLatency) - ff := failureByFunc{ - description: description(desc), - injectMember: injectLatency, - recoverMember: recoverLatency, - } - f := &failureLeader{ff, 0} - return &failureDelay{ - failure: f, - delayDuration: triggerElectionDur, - } -} - -func newFailureSlowNetworkAll() failure { - f := &failureAll{ - description: "slow down all members' network", - injectMember: injectLatency, - recoverMember: recoverLatency, - } - return &failureDelay{ - failure: f, - delayDuration: triggerElectionDur, - } -} - -func newFailureNop() failure { - return &failureNop{ - description: "no failure", - } -} - -func newFailureExternal(scriptPath string) failure { - return &failureExternal{ - description: fmt.Sprintf("external fault injector (script: %s)", scriptPath), - scriptPath: scriptPath, - } -} diff --git a/tools/functional-tester/etcd-tester/http.go b/tools/functional-tester/etcd-tester/http.go deleted file mode 100644 index a9d9a30a8..000000000 --- a/tools/functional-tester/etcd-tester/http.go +++ /dev/null @@ -1,44 +0,0 @@ -// Copyright 2015 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package main - -import ( - "encoding/json" - "net/http" -) - -type statusHandler struct { - status *Status -} - -func (sh statusHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/json") - en := json.NewEncoder(w) - - sh.status.mu.Lock() - defer sh.status.mu.Unlock() - - if err := en.Encode(Status{ - Since: sh.status.Since, - Failures: sh.status.Failures, - RoundLimit: sh.status.RoundLimit, - Cluster: sh.status.cluster.Status(), - cluster: sh.status.cluster, - Round: sh.status.Round, - Case: sh.status.Case, - }); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - } -} diff --git a/tools/functional-tester/etcd-tester/key_stresser.go b/tools/functional-tester/etcd-tester/key_stresser.go deleted file mode 100644 index 78f1bcc3f..000000000 --- a/tools/functional-tester/etcd-tester/key_stresser.go +++ /dev/null @@ -1,331 +0,0 @@ -// Copyright 2016 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package main - -import ( - "context" - "fmt" - "math/rand" - "sync" - "sync/atomic" - "time" - - "github.com/coreos/etcd/etcdserver" - "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" - pb "github.com/coreos/etcd/etcdserver/etcdserverpb" - - "golang.org/x/time/rate" - "google.golang.org/grpc" - "google.golang.org/grpc/transport" -) - -type keyStresser struct { - Endpoint string - - keyLargeSize int - keySize int - keySuffixRange int - keyTxnSuffixRange int - keyTxnOps int - - N int - - rateLimiter *rate.Limiter - - wg sync.WaitGroup - - cancel func() - conn *grpc.ClientConn - // atomicModifiedKeys records the number of keys created and deleted by the stresser. - atomicModifiedKeys int64 - - stressTable *stressTable -} - -func (s *keyStresser) Stress() error { - // TODO: add backoff option - conn, err := grpc.Dial(s.Endpoint, grpc.WithInsecure()) - if err != nil { - return fmt.Errorf("%v (%s)", err, s.Endpoint) - } - ctx, cancel := context.WithCancel(context.Background()) - - s.wg.Add(s.N) - s.conn = conn - s.cancel = cancel - - kvc := pb.NewKVClient(conn) - - var stressEntries = []stressEntry{ - {weight: 0.7, f: newStressPut(kvc, s.keySuffixRange, s.keySize)}, - { - weight: 0.7 * float32(s.keySize) / float32(s.keyLargeSize), - f: newStressPut(kvc, s.keySuffixRange, s.keyLargeSize), - }, - {weight: 0.07, f: newStressRange(kvc, s.keySuffixRange)}, - {weight: 0.07, f: newStressRangeInterval(kvc, s.keySuffixRange)}, - {weight: 0.07, f: newStressDelete(kvc, s.keySuffixRange)}, - {weight: 0.07, f: newStressDeleteInterval(kvc, s.keySuffixRange)}, - } - if s.keyTxnSuffixRange > 0 { - // adjust to make up ±70% of workloads with writes - stressEntries[0].weight = 0.35 - stressEntries = append(stressEntries, stressEntry{ - weight: 0.35, - f: newStressTxn(kvc, s.keyTxnSuffixRange, s.keyTxnOps), - }) - } - s.stressTable = createStressTable(stressEntries) - - for i := 0; i < s.N; i++ { - go s.run(ctx) - } - - plog.Infof("keyStresser %q is started", s.Endpoint) - return nil -} - -func (s *keyStresser) run(ctx context.Context) { - defer s.wg.Done() - - for { - if err := s.rateLimiter.Wait(ctx); err == context.Canceled { - return - } - - // TODO: 10-second is enough timeout to cover leader failure - // and immediate leader election. Find out what other cases this - // could be timed out. - sctx, scancel := context.WithTimeout(ctx, 10*time.Second) - err, modifiedKeys := s.stressTable.choose()(sctx) - scancel() - if err == nil { - atomic.AddInt64(&s.atomicModifiedKeys, modifiedKeys) - continue - } - - switch rpctypes.ErrorDesc(err) { - case context.DeadlineExceeded.Error(): - // This retries when request is triggered at the same time as - // leader failure. When we terminate the leader, the request to - // that leader cannot be processed, and times out. Also requests - // to followers cannot be forwarded to the old leader, so timing out - // as well. We want to keep stressing until the cluster elects a - // new leader and start processing requests again. - case etcdserver.ErrTimeoutDueToLeaderFail.Error(), etcdserver.ErrTimeout.Error(): - // This retries when request is triggered at the same time as - // leader failure and follower nodes receive time out errors - // from losing their leader. Followers should retry to connect - // to the new leader. - case etcdserver.ErrStopped.Error(): - // one of the etcd nodes stopped from failure injection - case transport.ErrConnClosing.Desc: - // server closed the transport (failure injected node) - case rpctypes.ErrNotCapable.Error(): - // capability check has not been done (in the beginning) - case rpctypes.ErrTooManyRequests.Error(): - // hitting the recovering member. - case context.Canceled.Error(): - // from stresser.Cancel method: - return - case grpc.ErrClientConnClosing.Error(): - // from stresser.Cancel method: - return - default: - plog.Errorf("keyStresser %v exited with error (%v)", s.Endpoint, err) - return - } - } -} - -func (s *keyStresser) Pause() { - s.Close() -} - -func (s *keyStresser) Close() { - s.cancel() - s.conn.Close() - s.wg.Wait() - plog.Infof("keyStresser %q is closed", s.Endpoint) - -} - -func (s *keyStresser) ModifiedKeys() int64 { - return atomic.LoadInt64(&s.atomicModifiedKeys) -} - -func (s *keyStresser) Checker() Checker { return nil } - -type stressFunc func(ctx context.Context) (err error, modifiedKeys int64) - -type stressEntry struct { - weight float32 - f stressFunc -} - -type stressTable struct { - entries []stressEntry - sumWeights float32 -} - -func createStressTable(entries []stressEntry) *stressTable { - st := stressTable{entries: entries} - for _, entry := range st.entries { - st.sumWeights += entry.weight - } - return &st -} - -func (st *stressTable) choose() stressFunc { - v := rand.Float32() * st.sumWeights - var sum float32 - var idx int - for i := range st.entries { - sum += st.entries[i].weight - if sum >= v { - idx = i - break - } - } - return st.entries[idx].f -} - -func newStressPut(kvc pb.KVClient, keySuffixRange, keySize int) stressFunc { - return func(ctx context.Context) (error, int64) { - _, err := kvc.Put(ctx, &pb.PutRequest{ - Key: []byte(fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange))), - Value: randBytes(keySize), - }, grpc.FailFast(false)) - return err, 1 - } -} - -func newStressTxn(kvc pb.KVClient, keyTxnSuffixRange, txnOps int) stressFunc { - keys := make([]string, keyTxnSuffixRange) - for i := range keys { - keys[i] = fmt.Sprintf("/k%03d", i) - } - return writeTxn(kvc, keys, txnOps) -} - -func writeTxn(kvc pb.KVClient, keys []string, txnOps int) stressFunc { - return func(ctx context.Context) (error, int64) { - ks := make(map[string]struct{}, txnOps) - for len(ks) != txnOps { - ks[keys[rand.Intn(len(keys))]] = struct{}{} - } - selected := make([]string, 0, txnOps) - for k := range ks { - selected = append(selected, k) - } - com, delOp, putOp := getTxnReqs(selected[0], "bar00") - txnReq := &pb.TxnRequest{ - Compare: []*pb.Compare{com}, - Success: []*pb.RequestOp{delOp}, - Failure: []*pb.RequestOp{putOp}, - } - - // add nested txns if any - for i := 1; i < txnOps; i++ { - k, v := selected[i], fmt.Sprintf("bar%02d", i) - com, delOp, putOp = getTxnReqs(k, v) - nested := &pb.RequestOp{ - Request: &pb.RequestOp_RequestTxn{ - RequestTxn: &pb.TxnRequest{ - Compare: []*pb.Compare{com}, - Success: []*pb.RequestOp{delOp}, - Failure: []*pb.RequestOp{putOp}, - }, - }, - } - txnReq.Success = append(txnReq.Success, nested) - txnReq.Failure = append(txnReq.Failure, nested) - } - - _, err := kvc.Txn(ctx, txnReq, grpc.FailFast(false)) - return err, int64(txnOps) - } -} - -func getTxnReqs(key, val string) (com *pb.Compare, delOp *pb.RequestOp, putOp *pb.RequestOp) { - // if key exists (version > 0) - com = &pb.Compare{ - Key: []byte(key), - Target: pb.Compare_VERSION, - Result: pb.Compare_GREATER, - TargetUnion: &pb.Compare_Version{Version: 0}, - } - delOp = &pb.RequestOp{ - Request: &pb.RequestOp_RequestDeleteRange{ - RequestDeleteRange: &pb.DeleteRangeRequest{ - Key: []byte(key), - }, - }, - } - putOp = &pb.RequestOp{ - Request: &pb.RequestOp_RequestPut{ - RequestPut: &pb.PutRequest{ - Key: []byte(key), - Value: []byte(val), - }, - }, - } - return com, delOp, putOp -} - -func newStressRange(kvc pb.KVClient, keySuffixRange int) stressFunc { - return func(ctx context.Context) (error, int64) { - _, err := kvc.Range(ctx, &pb.RangeRequest{ - Key: []byte(fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange))), - }, grpc.FailFast(false)) - return err, 0 - } -} - -func newStressRangeInterval(kvc pb.KVClient, keySuffixRange int) stressFunc { - return func(ctx context.Context) (error, int64) { - start := rand.Intn(keySuffixRange) - end := start + 500 - _, err := kvc.Range(ctx, &pb.RangeRequest{ - Key: []byte(fmt.Sprintf("foo%016x", start)), - RangeEnd: []byte(fmt.Sprintf("foo%016x", end)), - }, grpc.FailFast(false)) - return err, 0 - } -} - -func newStressDelete(kvc pb.KVClient, keySuffixRange int) stressFunc { - return func(ctx context.Context) (error, int64) { - _, err := kvc.DeleteRange(ctx, &pb.DeleteRangeRequest{ - Key: []byte(fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange))), - }, grpc.FailFast(false)) - return err, 1 - } -} - -func newStressDeleteInterval(kvc pb.KVClient, keySuffixRange int) stressFunc { - return func(ctx context.Context) (error, int64) { - start := rand.Intn(keySuffixRange) - end := start + 500 - resp, err := kvc.DeleteRange(ctx, &pb.DeleteRangeRequest{ - Key: []byte(fmt.Sprintf("foo%016x", start)), - RangeEnd: []byte(fmt.Sprintf("foo%016x", end)), - }, grpc.FailFast(false)) - if err == nil { - return nil, resp.Deleted - } - return err, 0 - } -} diff --git a/tools/functional-tester/etcd-tester/lease_stresser.go b/tools/functional-tester/etcd-tester/lease_stresser.go deleted file mode 100644 index ea15fd7a7..000000000 --- a/tools/functional-tester/etcd-tester/lease_stresser.go +++ /dev/null @@ -1,382 +0,0 @@ -// Copyright 2016 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package main - -import ( - "context" - "fmt" - "math/rand" - "sync" - "sync/atomic" - "time" - - "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" - pb "github.com/coreos/etcd/etcdserver/etcdserverpb" - - "golang.org/x/time/rate" - "google.golang.org/grpc" -) - -const ( - // time to live for lease - TTL = 120 - TTLShort = 2 -) - -type leaseStresser struct { - endpoint string - cancel func() - conn *grpc.ClientConn - kvc pb.KVClient - lc pb.LeaseClient - ctx context.Context - - rateLimiter *rate.Limiter - // atomicModifiedKey records the number of keys created and deleted during a test case - atomicModifiedKey int64 - numLeases int - keysPerLease int - - aliveLeases *atomicLeases - revokedLeases *atomicLeases - shortLivedLeases *atomicLeases - - runWg sync.WaitGroup - aliveWg sync.WaitGroup -} - -type atomicLeases struct { - // rwLock is used to protect read/write access of leases map - // which are accessed and modified by different go routines. - rwLock sync.RWMutex - leases map[int64]time.Time -} - -func (al *atomicLeases) add(leaseID int64, t time.Time) { - al.rwLock.Lock() - al.leases[leaseID] = t - al.rwLock.Unlock() -} - -func (al *atomicLeases) update(leaseID int64, t time.Time) { - al.rwLock.Lock() - _, ok := al.leases[leaseID] - if ok { - al.leases[leaseID] = t - } - al.rwLock.Unlock() -} - -func (al *atomicLeases) read(leaseID int64) (rv time.Time, ok bool) { - al.rwLock.RLock() - rv, ok = al.leases[leaseID] - al.rwLock.RUnlock() - return rv, ok -} - -func (al *atomicLeases) remove(leaseID int64) { - al.rwLock.Lock() - delete(al.leases, leaseID) - al.rwLock.Unlock() -} - -func (al *atomicLeases) getLeasesMap() map[int64]time.Time { - leasesCopy := make(map[int64]time.Time) - al.rwLock.RLock() - for k, v := range al.leases { - leasesCopy[k] = v - } - al.rwLock.RUnlock() - return leasesCopy -} - -func (ls *leaseStresser) setupOnce() error { - if ls.aliveLeases != nil { - return nil - } - if ls.numLeases == 0 { - panic("expect numLeases to be set") - } - if ls.keysPerLease == 0 { - panic("expect keysPerLease to be set") - } - - ls.aliveLeases = &atomicLeases{leases: make(map[int64]time.Time)} - - return nil -} - -func (ls *leaseStresser) Stress() error { - plog.Infof("lease Stresser %v starting ...", ls.endpoint) - if err := ls.setupOnce(); err != nil { - return err - } - - conn, err := grpc.Dial(ls.endpoint, grpc.WithInsecure(), grpc.WithBackoffMaxDelay(1*time.Second)) - if err != nil { - return fmt.Errorf("%v (%s)", err, ls.endpoint) - } - ls.conn = conn - ls.kvc = pb.NewKVClient(conn) - ls.lc = pb.NewLeaseClient(conn) - ls.revokedLeases = &atomicLeases{leases: make(map[int64]time.Time)} - ls.shortLivedLeases = &atomicLeases{leases: make(map[int64]time.Time)} - - ctx, cancel := context.WithCancel(context.Background()) - ls.cancel = cancel - ls.ctx = ctx - - ls.runWg.Add(1) - go ls.run() - return nil -} - -func (ls *leaseStresser) run() { - defer ls.runWg.Done() - ls.restartKeepAlives() - for { - // the number of keys created and deleted is roughly 2x the number of created keys for an iteration. - // the rateLimiter therefore consumes 2x ls.numLeases*ls.keysPerLease tokens where each token represents a create/delete operation for key. - err := ls.rateLimiter.WaitN(ls.ctx, 2*ls.numLeases*ls.keysPerLease) - if err == context.Canceled { - return - } - plog.Debugf("creating lease on %v", ls.endpoint) - ls.createLeases() - plog.Debugf("done creating lease on %v", ls.endpoint) - plog.Debugf("dropping lease on %v", ls.endpoint) - ls.randomlyDropLeases() - plog.Debugf("done dropping lease on %v", ls.endpoint) - } -} - -func (ls *leaseStresser) restartKeepAlives() { - for leaseID := range ls.aliveLeases.getLeasesMap() { - ls.aliveWg.Add(1) - go func(id int64) { - ls.keepLeaseAlive(id) - }(leaseID) - } -} - -func (ls *leaseStresser) createLeases() { - ls.createAliveLeases() - ls.createShortLivedLeases() -} - -func (ls *leaseStresser) createAliveLeases() { - neededLeases := ls.numLeases - len(ls.aliveLeases.getLeasesMap()) - var wg sync.WaitGroup - for i := 0; i < neededLeases; i++ { - wg.Add(1) - go func() { - defer wg.Done() - leaseID, err := ls.createLeaseWithKeys(TTL) - if err != nil { - plog.Debugf("lease creation error: (%v)", err) - return - } - ls.aliveLeases.add(leaseID, time.Now()) - // keep track of all the keep lease alive go routines - ls.aliveWg.Add(1) - go ls.keepLeaseAlive(leaseID) - }() - } - wg.Wait() -} - -func (ls *leaseStresser) createShortLivedLeases() { - // one round of createLeases() might not create all the short lived leases we want due to falures. - // thus, we want to create remaining short lived leases in the future round. - neededLeases := ls.numLeases - len(ls.shortLivedLeases.getLeasesMap()) - var wg sync.WaitGroup - for i := 0; i < neededLeases; i++ { - wg.Add(1) - go func() { - defer wg.Done() - leaseID, err := ls.createLeaseWithKeys(TTLShort) - if err != nil { - return - } - ls.shortLivedLeases.add(leaseID, time.Now()) - }() - } - wg.Wait() -} - -func (ls *leaseStresser) createLeaseWithKeys(ttl int64) (int64, error) { - leaseID, err := ls.createLease(ttl) - if err != nil { - plog.Debugf("lease creation error: (%v)", err) - return -1, err - } - plog.Debugf("lease %v created ", leaseID) - if err := ls.attachKeysWithLease(leaseID); err != nil { - return -1, err - } - return leaseID, nil -} - -func (ls *leaseStresser) randomlyDropLeases() { - var wg sync.WaitGroup - for l := range ls.aliveLeases.getLeasesMap() { - wg.Add(1) - go func(leaseID int64) { - defer wg.Done() - dropped, err := ls.randomlyDropLease(leaseID) - // if randomlyDropLease encountered an error such as context is cancelled, remove the lease from aliveLeases - // because we can't tell whether the lease is dropped or not. - if err != nil { - plog.Debugf("drop lease %v has failed error (%v)", leaseID, err) - ls.aliveLeases.remove(leaseID) - return - } - if !dropped { - return - } - plog.Debugf("lease %v dropped", leaseID) - ls.revokedLeases.add(leaseID, time.Now()) - ls.aliveLeases.remove(leaseID) - }(l) - } - wg.Wait() -} - -func (ls *leaseStresser) createLease(ttl int64) (int64, error) { - resp, err := ls.lc.LeaseGrant(ls.ctx, &pb.LeaseGrantRequest{TTL: ttl}) - if err != nil { - return -1, err - } - return resp.ID, nil -} - -func (ls *leaseStresser) keepLeaseAlive(leaseID int64) { - defer ls.aliveWg.Done() - ctx, cancel := context.WithCancel(ls.ctx) - stream, err := ls.lc.LeaseKeepAlive(ctx) - defer func() { cancel() }() - for { - select { - case <-time.After(500 * time.Millisecond): - case <-ls.ctx.Done(): - plog.Debugf("keepLeaseAlive lease %v context canceled ", leaseID) - // it is possible that lease expires at invariant checking phase but not at keepLeaseAlive() phase. - // this scenerio is possible when alive lease is just about to expire when keepLeaseAlive() exists and expires at invariant checking phase. - // to circumvent that scenerio, we check each lease before keepalive loop exist to see if it has been renewed in last TTL/2 duration. - // if it is renewed, this means that invariant checking have at least ttl/2 time before lease exipres which is long enough for the checking to finish. - // if it is not renewed, we remove the lease from the alive map so that the lease doesn't exipre during invariant checking - renewTime, ok := ls.aliveLeases.read(leaseID) - if ok && renewTime.Add(TTL/2*time.Second).Before(time.Now()) { - ls.aliveLeases.remove(leaseID) - plog.Debugf("keepLeaseAlive lease %v has not been renewed. drop it.", leaseID) - } - return - } - - if err != nil { - plog.Debugf("keepLeaseAlive lease %v creates stream error: (%v)", leaseID, err) - cancel() - ctx, cancel = context.WithCancel(ls.ctx) - stream, err = ls.lc.LeaseKeepAlive(ctx) - cancel() - continue - } - err = stream.Send(&pb.LeaseKeepAliveRequest{ID: leaseID}) - plog.Debugf("keepLeaseAlive stream sends lease %v keepalive request", leaseID) - if err != nil { - plog.Debugf("keepLeaseAlive stream sends lease %v error (%v)", leaseID, err) - continue - } - leaseRenewTime := time.Now() - plog.Debugf("keepLeaseAlive stream sends lease %v keepalive request succeed", leaseID) - respRC, err := stream.Recv() - if err != nil { - plog.Debugf("keepLeaseAlive stream receives lease %v stream error (%v)", leaseID, err) - continue - } - // lease expires after TTL become 0 - // don't send keepalive if the lease has expired - if respRC.TTL <= 0 { - plog.Debugf("keepLeaseAlive stream receives lease %v has TTL <= 0", leaseID) - ls.aliveLeases.remove(leaseID) - return - } - // renew lease timestamp only if lease is present - plog.Debugf("keepLeaseAlive renew lease %v", leaseID) - ls.aliveLeases.update(leaseID, leaseRenewTime) - } -} - -// attachKeysWithLease function attaches keys to the lease. -// the format of key is the concat of leaseID + '_' + '' -// e.g 5186835655248304152_0 for first created key and 5186835655248304152_1 for second created key -func (ls *leaseStresser) attachKeysWithLease(leaseID int64) error { - var txnPuts []*pb.RequestOp - for j := 0; j < ls.keysPerLease; j++ { - txnput := &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: &pb.PutRequest{Key: []byte(fmt.Sprintf("%d%s%d", leaseID, "_", j)), - Value: []byte(fmt.Sprintf("bar")), Lease: leaseID}}} - txnPuts = append(txnPuts, txnput) - } - // keep retrying until lease is not found or ctx is being canceled - for ls.ctx.Err() == nil { - txn := &pb.TxnRequest{Success: txnPuts} - _, err := ls.kvc.Txn(ls.ctx, txn) - if err == nil { - // since all created keys will be deleted too, the number of operations on keys will be roughly 2x the number of created keys - atomic.AddInt64(&ls.atomicModifiedKey, 2*int64(ls.keysPerLease)) - return nil - } - if rpctypes.Error(err) == rpctypes.ErrLeaseNotFound { - return err - } - } - return ls.ctx.Err() -} - -// randomlyDropLease drops the lease only when the rand.Int(2) returns 1. -// This creates a 50/50 percents chance of dropping a lease -func (ls *leaseStresser) randomlyDropLease(leaseID int64) (bool, error) { - if rand.Intn(2) != 0 { - return false, nil - } - // keep retrying until a lease is dropped or ctx is being canceled - for ls.ctx.Err() == nil { - _, err := ls.lc.LeaseRevoke(ls.ctx, &pb.LeaseRevokeRequest{ID: leaseID}) - if err == nil || rpctypes.Error(err) == rpctypes.ErrLeaseNotFound { - return true, nil - } - } - plog.Debugf("randomlyDropLease error: (%v)", ls.ctx.Err()) - return false, ls.ctx.Err() -} - -func (ls *leaseStresser) Pause() { - ls.Close() -} - -func (ls *leaseStresser) Close() { - plog.Debugf("lease stresser %q is closing...", ls.endpoint) - ls.cancel() - ls.runWg.Wait() - ls.aliveWg.Wait() - ls.conn.Close() - plog.Infof("lease stresser %q is closed", ls.endpoint) -} - -func (ls *leaseStresser) ModifiedKeys() int64 { - return atomic.LoadInt64(&ls.atomicModifiedKey) -} - -func (ls *leaseStresser) Checker() Checker { return &leaseChecker{endpoint: ls.endpoint, ls: ls} } diff --git a/tools/functional-tester/etcd-tester/main.go b/tools/functional-tester/etcd-tester/main.go deleted file mode 100644 index 82adb362f..000000000 --- a/tools/functional-tester/etcd-tester/main.go +++ /dev/null @@ -1,232 +0,0 @@ -// Copyright 2015 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package main - -import ( - "flag" - "fmt" - "io/ioutil" - "net/http" - "os" - "strings" - - "github.com/coreos/etcd/pkg/debugutil" - - "github.com/coreos/pkg/capnslog" - "github.com/prometheus/client_golang/prometheus/promhttp" - "golang.org/x/time/rate" - "google.golang.org/grpc/grpclog" -) - -var plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "etcd-tester") - -const ( - defaultClientPort = 2379 - defaultPeerPort = 2380 - defaultFailpointPort = 2381 -) - -func main() { - endpointStr := flag.String("agent-endpoints", "localhost:9027", "HTTP RPC endpoints of agents. Do not specify the schema.") - clientPorts := flag.String("client-ports", "", "etcd client port for each agent endpoint") - advertiseClientPorts := flag.String("advertise-client-ports", "", "etcd advertise client port for each agent endpoint") - peerPorts := flag.String("peer-ports", "", "etcd peer port for each agent endpoint") - advertisePeerPorts := flag.String("advertise-peer-ports", "", "etcd advertise peer port for each agent endpoint") - failpointPorts := flag.String("failpoint-ports", "", "etcd failpoint port for each agent endpoint") - - stressKeyLargeSize := flag.Uint("stress-key-large-size", 32*1024+1, "the size of each large key written into etcd.") - stressKeySize := flag.Uint("stress-key-size", 100, "the size of each small key written into etcd.") - stressKeySuffixRange := flag.Uint("stress-key-count", 250000, "the count of key range written into etcd.") - stressKeyTxnSuffixRange := flag.Uint("stress-key-txn-count", 100, "the count of key range written into etcd txn (max 100).") - stressKeyTxnOps := flag.Uint("stress-key-txn-ops", 1, "number of operations per a transaction (max 64).") - limit := flag.Int("limit", -1, "the limit of rounds to run failure set (-1 to run without limits).") - exitOnFailure := flag.Bool("exit-on-failure", false, "exit tester on first failure") - stressQPS := flag.Int("stress-qps", 10000, "maximum number of stresser requests per second.") - schedCases := flag.String("schedule-cases", "", "test case schedule") - consistencyCheck := flag.Bool("consistency-check", true, "true to check consistency (revision, hash)") - stresserType := flag.String("stresser", "keys,lease", "comma separated list of stressing clients (keys, lease, v2keys, nop, election-runner, watch-runner, lock-racer-runner, lease-runner).") - etcdRunnerPath := flag.String("etcd-runner", "", "specify a path of etcd runner binary") - failureTypes := flag.String("failures", "default,failpoints", "specify failures (concat of \"default\" and \"failpoints\").") - failpoints := flag.String("failpoints", `panic("etcd-tester")`, `comma separated list of failpoint terms to inject (e.g. 'panic("etcd-tester"),1*sleep(1000)')`) - externalFailures := flag.String("external-failures", "", "specify a path of script for enabling/disabling an external fault injector") - enablePprof := flag.Bool("enable-pprof", false, "true to enable pprof") - flag.Parse() - - // to discard gRPC-side balancer logs - grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, ioutil.Discard, ioutil.Discard)) - - eps := strings.Split(*endpointStr, ",") - cports := portsFromArg(*clientPorts, len(eps), defaultClientPort) - acports := portsFromArg(*advertiseClientPorts, len(eps), defaultClientPort) - pports := portsFromArg(*peerPorts, len(eps), defaultPeerPort) - apports := portsFromArg(*advertisePeerPorts, len(eps), defaultPeerPort) - fports := portsFromArg(*failpointPorts, len(eps), defaultFailpointPort) - agents := make([]agentConfig, len(eps)) - - for i := range eps { - agents[i].endpoint = eps[i] - agents[i].clientPort = cports[i] - agents[i].advertiseClientPort = acports[i] - agents[i].peerPort = pports[i] - agents[i].advertisePeerPort = apports[i] - agents[i].failpointPort = fports[i] - } - - c := &cluster{agents: agents} - if err := c.bootstrap(); err != nil { - plog.Fatal(err) - } - defer c.Terminate() - - // ensure cluster is fully booted to know failpoints are available - c.WaitHealth() - - var failures []failure - - if failureTypes != nil && *failureTypes != "" { - types, failpoints := strings.Split(*failureTypes, ","), strings.Split(*failpoints, ",") - failures = makeFailures(types, failpoints, c) - } - - if externalFailures != nil && *externalFailures != "" { - if len(failures) != 0 { - plog.Errorf("specify only one of -failures or -external-failures") - os.Exit(1) - } - failures = append(failures, newFailureExternal(*externalFailures)) - } - - if len(failures) == 0 { - plog.Infof("no failures\n") - failures = append(failures, newFailureNop()) - } - - schedule := failures - if schedCases != nil && *schedCases != "" { - cases := strings.Split(*schedCases, " ") - schedule = make([]failure, len(cases)) - for i := range cases { - caseNum := 0 - n, err := fmt.Sscanf(cases[i], "%d", &caseNum) - if n == 0 || err != nil { - plog.Fatalf(`couldn't parse case "%s" (%v)`, cases[i], err) - } - schedule[i] = failures[caseNum] - } - } - - scfg := stressConfig{ - rateLimiter: rate.NewLimiter(rate.Limit(*stressQPS), *stressQPS), - keyLargeSize: int(*stressKeyLargeSize), - keySize: int(*stressKeySize), - keySuffixRange: int(*stressKeySuffixRange), - keyTxnSuffixRange: int(*stressKeyTxnSuffixRange), - keyTxnOps: int(*stressKeyTxnOps), - numLeases: 10, - keysPerLease: 10, - - etcdRunnerPath: *etcdRunnerPath, - } - if scfg.keyTxnSuffixRange > 100 { - plog.Fatalf("stress-key-txn-count is maximum 100, got %d", scfg.keyTxnSuffixRange) - } - if scfg.keyTxnOps > 64 { - plog.Fatalf("stress-key-txn-ops is maximum 64, got %d", scfg.keyTxnOps) - } - - t := &tester{ - failures: schedule, - cluster: c, - limit: *limit, - exitOnFailure: *exitOnFailure, - - scfg: scfg, - stresserType: *stresserType, - doChecks: *consistencyCheck, - } - - sh := statusHandler{status: &t.status} - http.Handle("/status", sh) - http.Handle("/metrics", promhttp.Handler()) - - if *enablePprof { - for p, h := range debugutil.PProfHandlers() { - http.Handle(p, h) - } - } - - go func() { plog.Fatal(http.ListenAndServe(":9028", nil)) }() - - t.runLoop() -} - -// portsFromArg converts a comma separated list into a slice of ints -func portsFromArg(arg string, n, defaultPort int) []int { - ret := make([]int, n) - if len(arg) == 0 { - for i := range ret { - ret[i] = defaultPort - } - return ret - } - s := strings.Split(arg, ",") - if len(s) != n { - fmt.Printf("expected %d ports, got %d (%s)\n", n, len(s), arg) - os.Exit(1) - } - for i := range s { - if _, err := fmt.Sscanf(s[i], "%d", &ret[i]); err != nil { - fmt.Println(err) - os.Exit(1) - } - } - return ret -} - -func makeFailures(types, failpoints []string, c *cluster) []failure { - var failures []failure - for i := range types { - switch types[i] { - case "default": - defaultFailures := []failure{ - newFailureKillAll(), - newFailureKillMajority(), - newFailureKillOne(), - newFailureKillLeader(), - newFailureKillOneForLongTime(), - newFailureKillLeaderForLongTime(), - newFailureIsolate(), - newFailureIsolateAll(), - newFailureSlowNetworkOneMember(), - newFailureSlowNetworkLeader(), - newFailureSlowNetworkAll(), - } - failures = append(failures, defaultFailures...) - - case "failpoints": - fpFailures, fperr := failpointFailures(c, failpoints) - if len(fpFailures) == 0 { - plog.Infof("no failpoints found (%v)", fperr) - } - failures = append(failures, fpFailures...) - - default: - plog.Errorf("unknown failure: %s\n", types[i]) - os.Exit(1) - } - } - - return failures -} diff --git a/tools/functional-tester/etcd-tester/member.go b/tools/functional-tester/etcd-tester/member.go deleted file mode 100644 index 2e3eef4db..000000000 --- a/tools/functional-tester/etcd-tester/member.go +++ /dev/null @@ -1,190 +0,0 @@ -// Copyright 2016 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package main - -import ( - "context" - "fmt" - "net" - "net/url" - "time" - - "github.com/coreos/etcd/clientv3" - pb "github.com/coreos/etcd/etcdserver/etcdserverpb" - "github.com/coreos/etcd/tools/functional-tester/etcd-agent/client" - - "google.golang.org/grpc" -) - -type member struct { - Agent client.Agent - Endpoint string - Name string - ClientURL string - AdvertiseClientURL string - PeerURL string - AdvertisePeerURL string - FailpointURL string -} - -func (m *member) ClusterEntry() string { return m.Name + "=" + m.AdvertisePeerURL } - -func (m *member) Flags() []string { - return []string{ - "--name", m.Name, - "--listen-client-urls", m.ClientURL, - "--advertise-client-urls", m.AdvertiseClientURL, - "--listen-peer-urls", m.PeerURL, - "--initial-advertise-peer-urls", m.AdvertisePeerURL, - "--initial-cluster-state", "new", - "--snapshot-count", "10000", - "--pre-vote", - "--experimental-initial-corrupt-check", - } -} - -func (m *member) CheckCompact(rev int64) error { - cli, err := m.newClientV3() - if err != nil { - return fmt.Errorf("%v (endpoint %s)", err, m.AdvertiseClientURL) - } - defer cli.Close() - - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - wch := cli.Watch(ctx, "\x00", clientv3.WithFromKey(), clientv3.WithRev(rev-1)) - wr, ok := <-wch - cancel() - - if !ok { - return fmt.Errorf("watch channel terminated (endpoint %s)", m.AdvertiseClientURL) - } - if wr.CompactRevision != rev { - return fmt.Errorf("got compact revision %v, wanted %v (endpoint %s)", wr.CompactRevision, rev, m.AdvertiseClientURL) - } - - return nil -} - -func (m *member) Defrag() error { - plog.Printf("defragmenting %s", m.AdvertiseClientURL) - cli, err := m.newClientV3() - if err != nil { - return err - } - defer cli.Close() - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) - _, err = cli.Defragment(ctx, m.AdvertiseClientURL) - cancel() - if err != nil { - return err - } - plog.Printf("defragmented %s", m.AdvertiseClientURL) - return nil -} - -func (m *member) RevHash() (int64, int64, error) { - conn, err := m.dialGRPC() - if err != nil { - return 0, 0, err - } - mt := pb.NewMaintenanceClient(conn) - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - resp, err := mt.Hash(ctx, &pb.HashRequest{}, grpc.FailFast(false)) - cancel() - conn.Close() - - if err != nil { - return 0, 0, err - } - - return resp.Header.Revision, int64(resp.Hash), nil -} - -func (m *member) Rev(ctx context.Context) (int64, error) { - cli, err := m.newClientV3() - if err != nil { - return 0, err - } - defer cli.Close() - resp, err := cli.Status(ctx, m.AdvertiseClientURL) - if err != nil { - return 0, err - } - return resp.Header.Revision, nil -} - -func (m *member) IsLeader() (bool, error) { - cli, err := m.newClientV3() - if err != nil { - return false, err - } - defer cli.Close() - resp, err := cli.Status(context.Background(), m.AdvertiseClientURL) - if err != nil { - return false, err - } - return resp.Header.MemberId == resp.Leader, nil -} - -func (m *member) SetHealthKeyV3() error { - cli, err := m.newClientV3() - if err != nil { - return fmt.Errorf("%v (%s)", err, m.AdvertiseClientURL) - } - defer cli.Close() - // give enough time-out in case expensive requests (range/delete) are pending - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - _, err = cli.Put(ctx, "health", "good") - cancel() - if err != nil { - return fmt.Errorf("%v (%s)", err, m.AdvertiseClientURL) - } - return nil -} - -func (m *member) newClientV3() (*clientv3.Client, error) { - return clientv3.New(clientv3.Config{ - Endpoints: []string{m.AdvertiseClientURL}, - DialTimeout: 5 * time.Second, - }) -} - -func (m *member) dialGRPC() (*grpc.ClientConn, error) { - return grpc.Dial(m.grpcAddr(), grpc.WithInsecure(), grpc.WithTimeout(5*time.Second), grpc.WithBlock()) -} - -// grpcAddr gets the host from clientURL so it works with grpc.Dial() -func (m *member) grpcAddr() string { - u, err := url.Parse(m.AdvertiseClientURL) - if err != nil { - panic(err) - } - return u.Host -} - -func (m *member) peerPort() (port int) { - u, err := url.Parse(m.AdvertisePeerURL) - if err != nil { - panic(err) - } - _, portStr, err := net.SplitHostPort(u.Host) - if err != nil { - panic(err) - } - if _, err = fmt.Sscanf(portStr, "%d", &port); err != nil { - panic(err) - } - return port -} diff --git a/tools/functional-tester/etcd-tester/metrics.go b/tools/functional-tester/etcd-tester/metrics.go deleted file mode 100644 index 7018ba570..000000000 --- a/tools/functional-tester/etcd-tester/metrics.go +++ /dev/null @@ -1,64 +0,0 @@ -// Copyright 2016 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package main - -import ( - "github.com/prometheus/client_golang/prometheus" -) - -var ( - caseTotalCounter = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: "etcd", - Subsystem: "funcational_tester", - Name: "case_total", - Help: "Total number of finished test cases", - }, - []string{"desc"}, - ) - - caseFailedTotalCounter = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: "etcd", - Subsystem: "funcational_tester", - Name: "case_failed_total", - Help: "Total number of failed test cases", - }, - []string{"desc"}, - ) - - roundTotalCounter = prometheus.NewCounter( - prometheus.CounterOpts{ - Namespace: "etcd", - Subsystem: "funcational_tester", - Name: "round_total", - Help: "Total number of finished test rounds.", - }) - - roundFailedTotalCounter = prometheus.NewCounter( - prometheus.CounterOpts{ - Namespace: "etcd", - Subsystem: "funcational_tester", - Name: "round_failed_total", - Help: "Total number of failed test rounds.", - }) -) - -func init() { - prometheus.MustRegister(caseTotalCounter) - prometheus.MustRegister(caseFailedTotalCounter) - prometheus.MustRegister(roundTotalCounter) - prometheus.MustRegister(roundFailedTotalCounter) -} diff --git a/tools/functional-tester/etcd-tester/status.go b/tools/functional-tester/etcd-tester/status.go deleted file mode 100644 index 3721c8076..000000000 --- a/tools/functional-tester/etcd-tester/status.go +++ /dev/null @@ -1,57 +0,0 @@ -// Copyright 2016 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package main - -import ( - "sync" - "time" -) - -type Status struct { - Since time.Time - Failures []string - RoundLimit int - - Cluster ClusterStatus - cluster *cluster - - mu sync.Mutex // guards Round and Case - Round int - Case int -} - -func (s *Status) setRound(r int) { - s.mu.Lock() - defer s.mu.Unlock() - s.Round = r -} - -func (s *Status) getRound() int { - s.mu.Lock() - defer s.mu.Unlock() - return s.Round -} - -func (s *Status) setCase(c int) { - s.mu.Lock() - defer s.mu.Unlock() - s.Case = c -} - -func (s *Status) getCase() int { - s.mu.Lock() - defer s.mu.Unlock() - return s.Case -} diff --git a/tools/functional-tester/etcd-tester/stresser.go b/tools/functional-tester/etcd-tester/stresser.go deleted file mode 100644 index 9c3bddb7c..000000000 --- a/tools/functional-tester/etcd-tester/stresser.go +++ /dev/null @@ -1,218 +0,0 @@ -// Copyright 2015 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package main - -import ( - "fmt" - "strings" - "sync" - "time" - - "golang.org/x/time/rate" -) - -type Stresser interface { - // Stress starts to stress the etcd cluster - Stress() error - // Pause stops the stresser from sending requests to etcd. Resume by calling Stress. - Pause() - // Close releases all of the Stresser's resources. - Close() - // ModifiedKeys reports the number of keys created and deleted by stresser - ModifiedKeys() int64 - // Checker returns an invariant checker for after the stresser is canceled. - Checker() Checker -} - -// nopStresser implements Stresser that does nothing -type nopStresser struct { - start time.Time - qps int -} - -func (s *nopStresser) Stress() error { return nil } -func (s *nopStresser) Pause() {} -func (s *nopStresser) Close() {} -func (s *nopStresser) ModifiedKeys() int64 { - return 0 -} -func (s *nopStresser) Checker() Checker { return nil } - -// compositeStresser implements a Stresser that runs a slice of -// stressing clients concurrently. -type compositeStresser struct { - stressers []Stresser -} - -func (cs *compositeStresser) Stress() error { - for i, s := range cs.stressers { - if err := s.Stress(); err != nil { - for j := 0; j < i; j++ { - cs.stressers[i].Close() - } - return err - } - } - return nil -} - -func (cs *compositeStresser) Pause() { - var wg sync.WaitGroup - wg.Add(len(cs.stressers)) - for i := range cs.stressers { - go func(s Stresser) { - defer wg.Done() - s.Pause() - }(cs.stressers[i]) - } - wg.Wait() -} - -func (cs *compositeStresser) Close() { - var wg sync.WaitGroup - wg.Add(len(cs.stressers)) - for i := range cs.stressers { - go func(s Stresser) { - defer wg.Done() - s.Close() - }(cs.stressers[i]) - } - wg.Wait() -} - -func (cs *compositeStresser) ModifiedKeys() (modifiedKey int64) { - for _, stress := range cs.stressers { - modifiedKey += stress.ModifiedKeys() - } - return modifiedKey -} - -func (cs *compositeStresser) Checker() Checker { - var chks []Checker - for _, s := range cs.stressers { - if chk := s.Checker(); chk != nil { - chks = append(chks, chk) - } - } - if len(chks) == 0 { - return nil - } - return newCompositeChecker(chks) -} - -type stressConfig struct { - keyLargeSize int - keySize int - keySuffixRange int - keyTxnSuffixRange int - keyTxnOps int - - numLeases int - keysPerLease int - - rateLimiter *rate.Limiter - - etcdRunnerPath string -} - -// NewStresser creates stresser from a comma separated list of stresser types. -func NewStresser(s string, sc *stressConfig, m *member) Stresser { - types := strings.Split(s, ",") - if len(types) > 1 { - stressers := make([]Stresser, len(types)) - for i, stype := range types { - stressers[i] = NewStresser(stype, sc, m) - } - return &compositeStresser{stressers} - } - switch s { - case "nop": - return &nopStresser{start: time.Now(), qps: int(sc.rateLimiter.Limit())} - case "keys": - // TODO: Too intensive stressing clients can panic etcd member with - // 'out of memory' error. Put rate limits in server side. - return &keyStresser{ - Endpoint: m.grpcAddr(), - keyLargeSize: sc.keyLargeSize, - keySize: sc.keySize, - keySuffixRange: sc.keySuffixRange, - keyTxnSuffixRange: sc.keyTxnSuffixRange, - keyTxnOps: sc.keyTxnOps, - N: 100, - rateLimiter: sc.rateLimiter, - } - case "v2keys": - return &v2Stresser{ - Endpoint: m.ClientURL, - keySize: sc.keySize, - keySuffixRange: sc.keySuffixRange, - N: 100, - rateLimiter: sc.rateLimiter, - } - case "lease": - return &leaseStresser{ - endpoint: m.grpcAddr(), - numLeases: sc.numLeases, - keysPerLease: sc.keysPerLease, - rateLimiter: sc.rateLimiter, - } - case "election-runner": - reqRate := 100 - args := []string{ - "election", - fmt.Sprintf("%v", time.Now().UnixNano()), // election name as current nano time - "--dial-timeout=10s", - "--endpoints", m.grpcAddr(), - "--total-client-connections=10", - "--rounds=0", // runs forever - "--req-rate", fmt.Sprintf("%v", reqRate), - } - return newRunnerStresser(sc.etcdRunnerPath, args, sc.rateLimiter, reqRate) - case "watch-runner": - reqRate := 100 - args := []string{ - "watcher", - "--prefix", fmt.Sprintf("%v", time.Now().UnixNano()), // prefix all keys with nano time - "--total-keys=1", - "--total-prefixes=1", - "--watch-per-prefix=1", - "--endpoints", m.grpcAddr(), - "--rounds=0", // runs forever - "--req-rate", fmt.Sprintf("%v", reqRate), - } - return newRunnerStresser(sc.etcdRunnerPath, args, sc.rateLimiter, reqRate) - case "lock-racer-runner": - reqRate := 100 - args := []string{ - "lock-racer", - fmt.Sprintf("%v", time.Now().UnixNano()), // locker name as current nano time - "--endpoints", m.grpcAddr(), - "--total-client-connections=10", - "--rounds=0", // runs forever - "--req-rate", fmt.Sprintf("%v", reqRate), - } - return newRunnerStresser(sc.etcdRunnerPath, args, sc.rateLimiter, reqRate) - case "lease-runner": - args := []string{ - "lease-renewer", - "--ttl=30", - "--endpoints", m.grpcAddr(), - } - return newRunnerStresser(sc.etcdRunnerPath, args, sc.rateLimiter, 0) - default: - plog.Panicf("unknown stresser type: %s\n", s) - } - return nil // never reach here -} diff --git a/tools/functional-tester/etcd-tester/tester.go b/tools/functional-tester/etcd-tester/tester.go deleted file mode 100644 index 6c910ed83..000000000 --- a/tools/functional-tester/etcd-tester/tester.go +++ /dev/null @@ -1,286 +0,0 @@ -// Copyright 2015 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package main - -import ( - "fmt" - "os" - "time" -) - -type tester struct { - cluster *cluster - limit int - exitOnFailure bool - - failures []failure - status Status - currentRevision int64 - - stresserType string - scfg stressConfig - doChecks bool - - stresser Stresser - checker Checker -} - -// compactQPS is rough number of compact requests per second. -// Previous tests showed etcd can compact about 60,000 entries per second. -const compactQPS = 50000 - -func (tt *tester) runLoop() { - tt.status.Since = time.Now() - tt.status.RoundLimit = tt.limit - tt.status.cluster = tt.cluster - for _, f := range tt.failures { - tt.status.Failures = append(tt.status.Failures, f.Desc()) - } - - if err := tt.resetStressCheck(); err != nil { - plog.Errorf("%s failed to start stresser (%v)", tt.logPrefix(), err) - tt.failed() - return - } - - var preModifiedKey int64 - for round := 0; round < tt.limit || tt.limit == -1; round++ { - tt.status.setRound(round) - roundTotalCounter.Inc() - - if err := tt.doRound(round); err != nil { - plog.Warningf("%s functional-tester returning with error (%v)", tt.logPrefix(), err) - if tt.cleanup() != nil { - return - } - // reset preModifiedKey after clean up - preModifiedKey = 0 - continue - } - // -1 so that logPrefix doesn't print out 'case' - tt.status.setCase(-1) - - revToCompact := max(0, tt.currentRevision-10000) - currentModifiedKey := tt.stresser.ModifiedKeys() - modifiedKey := currentModifiedKey - preModifiedKey - preModifiedKey = currentModifiedKey - timeout := 10 * time.Second - timeout += time.Duration(modifiedKey/compactQPS) * time.Second - plog.Infof("%s compacting %d modifications (timeout %v)", tt.logPrefix(), modifiedKey, timeout) - if err := tt.compact(revToCompact, timeout); err != nil { - plog.Warningf("%s functional-tester compact got error (%v)", tt.logPrefix(), err) - if tt.cleanup() != nil { - return - } - // reset preModifiedKey after clean up - preModifiedKey = 0 - } - if round > 0 && round%500 == 0 { // every 500 rounds - if err := tt.defrag(); err != nil { - plog.Warningf("%s functional-tester returning with error (%v)", tt.logPrefix(), err) - tt.failed() - return - } - } - } - - plog.Infof("%s functional-tester is finished", tt.logPrefix()) -} - -func (tt *tester) doRound(round int) error { - for j, f := range tt.failures { - caseTotalCounter.WithLabelValues(f.Desc()).Inc() - tt.status.setCase(j) - - if err := tt.cluster.WaitHealth(); err != nil { - return fmt.Errorf("wait full health error: %v", err) - } - plog.Infof("%s injecting failure %q", tt.logPrefix(), f.Desc()) - if err := f.Inject(tt.cluster, round); err != nil { - return fmt.Errorf("injection error: %v", err) - } - plog.Infof("%s injected failure", tt.logPrefix()) - - plog.Infof("%s recovering failure %q", tt.logPrefix(), f.Desc()) - if err := f.Recover(tt.cluster, round); err != nil { - return fmt.Errorf("recovery error: %v", err) - } - plog.Infof("%s recovered failure", tt.logPrefix()) - tt.pauseStresser() - plog.Infof("%s wait until cluster is healthy", tt.logPrefix()) - if err := tt.cluster.WaitHealth(); err != nil { - return fmt.Errorf("wait full health error: %v", err) - } - plog.Infof("%s cluster is healthy", tt.logPrefix()) - - plog.Infof("%s checking consistency and invariant of cluster", tt.logPrefix()) - if err := tt.checkConsistency(); err != nil { - return fmt.Errorf("tt.checkConsistency error (%v)", err) - } - plog.Infof("%s checking consistency and invariant of cluster done", tt.logPrefix()) - - plog.Infof("%s succeed!", tt.logPrefix()) - } - return nil -} - -func (tt *tester) updateRevision() error { - revs, _, err := tt.cluster.getRevisionHash() - for _, rev := range revs { - tt.currentRevision = rev - break // just need get one of the current revisions - } - - plog.Infof("%s updated current revision to %d", tt.logPrefix(), tt.currentRevision) - return err -} - -func (tt *tester) checkConsistency() (err error) { - defer func() { - if err != nil { - return - } - if err = tt.updateRevision(); err != nil { - plog.Warningf("%s functional-tester returning with tt.updateRevision error (%v)", tt.logPrefix(), err) - return - } - err = tt.startStresser() - }() - if err = tt.checker.Check(); err != nil { - plog.Infof("%s %v", tt.logPrefix(), err) - } - return err -} - -func (tt *tester) compact(rev int64, timeout time.Duration) (err error) { - tt.pauseStresser() - defer func() { - if err == nil { - err = tt.startStresser() - } - }() - - plog.Infof("%s compacting storage (current revision %d, compact revision %d)", tt.logPrefix(), tt.currentRevision, rev) - if err = tt.cluster.compactKV(rev, timeout); err != nil { - return err - } - plog.Infof("%s compacted storage (compact revision %d)", tt.logPrefix(), rev) - - plog.Infof("%s checking compaction (compact revision %d)", tt.logPrefix(), rev) - if err = tt.cluster.checkCompact(rev); err != nil { - plog.Warningf("%s checkCompact error (%v)", tt.logPrefix(), err) - return err - } - - plog.Infof("%s confirmed compaction (compact revision %d)", tt.logPrefix(), rev) - return nil -} - -func (tt *tester) defrag() error { - plog.Infof("%s defragmenting...", tt.logPrefix()) - if err := tt.cluster.defrag(); err != nil { - plog.Warningf("%s defrag error (%v)", tt.logPrefix(), err) - if cerr := tt.cleanup(); cerr != nil { - return fmt.Errorf("%s, %s", err, cerr) - } - return err - } - plog.Infof("%s defragmented...", tt.logPrefix()) - return nil -} - -func (tt *tester) logPrefix() string { - var ( - rd = tt.status.getRound() - cs = tt.status.getCase() - prefix = fmt.Sprintf("[round#%d case#%d]", rd, cs) - ) - if cs == -1 { - prefix = fmt.Sprintf("[round#%d]", rd) - } - return prefix -} - -func (tt *tester) failed() { - if !tt.exitOnFailure { - return - } - plog.Warningf("%s exiting on failure", tt.logPrefix()) - tt.cluster.Terminate() - os.Exit(2) -} - -func (tt *tester) cleanup() error { - defer tt.failed() - - roundFailedTotalCounter.Inc() - desc := "compact/defrag" - if tt.status.Case != -1 { - desc = tt.failures[tt.status.Case].Desc() - } - caseFailedTotalCounter.WithLabelValues(desc).Inc() - - tt.closeStresser() - if err := tt.cluster.Cleanup(); err != nil { - plog.Warningf("%s cleanup error: %v", tt.logPrefix(), err) - return err - } - if err := tt.cluster.Reset(); err != nil { - plog.Warningf("%s cleanup Bootstrap error: %v", tt.logPrefix(), err) - return err - } - return tt.resetStressCheck() -} - -func (tt *tester) pauseStresser() { - plog.Infof("%s pausing the stressing clients...", tt.logPrefix()) - tt.stresser.Pause() - plog.Infof("%s paused stressing clients", tt.logPrefix()) -} - -func (tt *tester) startStresser() (err error) { - plog.Infof("%s starting the stressing clients...", tt.logPrefix()) - err = tt.stresser.Stress() - plog.Infof("%s started stressing clients", tt.logPrefix()) - return err -} - -func (tt *tester) closeStresser() { - plog.Infof("%s closing the stressing clients...", tt.logPrefix()) - tt.stresser.Close() - plog.Infof("%s closed stressing clients", tt.logPrefix()) -} - -func (tt *tester) resetStressCheck() error { - plog.Infof("%s resetting stressing clients and checkers...", tt.logPrefix()) - cs := &compositeStresser{} - for _, m := range tt.cluster.Members { - s := NewStresser(tt.stresserType, &tt.scfg, m) - cs.stressers = append(cs.stressers, s) - } - tt.stresser = cs - if !tt.doChecks { - tt.checker = newNoChecker() - return tt.startStresser() - } - chk := newHashChecker(hashAndRevGetter(tt.cluster)) - if schk := cs.Checker(); schk != nil { - chk = newCompositeChecker([]Checker{chk, schk}) - } - tt.checker = chk - return tt.startStresser() -} - -func (tt *tester) Report() int64 { return tt.stresser.ModifiedKeys() } diff --git a/tools/functional-tester/etcd-tester/util.go b/tools/functional-tester/etcd-tester/util.go deleted file mode 100644 index 697ab72ea..000000000 --- a/tools/functional-tester/etcd-tester/util.go +++ /dev/null @@ -1,51 +0,0 @@ -// Copyright 2016 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package main - -import ( - "fmt" - "strings" -) - -func getSameValue(vals map[string]int64) bool { - var rv int64 - for _, v := range vals { - if rv == 0 { - rv = v - } - if rv != v { - return false - } - } - return true -} - -func max(n1, n2 int64) int64 { - if n1 > n2 { - return n1 - } - return n2 -} - -func errsToError(errs []error) error { - if len(errs) == 0 { - return nil - } - stringArr := make([]string, len(errs)) - for i, err := range errs { - stringArr[i] = err.Error() - } - return fmt.Errorf(strings.Join(stringArr, ", ")) -} diff --git a/tools/functional-tester/etcd-tester/v2_stresser.go b/tools/functional-tester/etcd-tester/v2_stresser.go deleted file mode 100644 index 620532e0c..000000000 --- a/tools/functional-tester/etcd-tester/v2_stresser.go +++ /dev/null @@ -1,117 +0,0 @@ -// Copyright 2016 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package main - -import ( - "context" - "fmt" - "math/rand" - "net" - "net/http" - "sync" - "sync/atomic" - "time" - - "golang.org/x/time/rate" - - clientV2 "github.com/coreos/etcd/client" -) - -type v2Stresser struct { - Endpoint string - - keySize int - keySuffixRange int - - N int - - rateLimiter *rate.Limiter - - wg sync.WaitGroup - - atomicModifiedKey int64 - - cancel func() -} - -func (s *v2Stresser) Stress() error { - cfg := clientV2.Config{ - Endpoints: []string{s.Endpoint}, - Transport: &http.Transport{ - Dial: (&net.Dialer{ - Timeout: time.Second, - KeepAlive: 30 * time.Second, - }).Dial, - MaxIdleConnsPerHost: s.N, - }, - } - c, err := clientV2.New(cfg) - if err != nil { - return err - } - - kv := clientV2.NewKeysAPI(c) - ctx, cancel := context.WithCancel(context.Background()) - s.cancel = cancel - s.wg.Add(s.N) - for i := 0; i < s.N; i++ { - go func() { - defer s.wg.Done() - s.run(ctx, kv) - }() - } - return nil -} - -func (s *v2Stresser) run(ctx context.Context, kv clientV2.KeysAPI) { - for { - if err := s.rateLimiter.Wait(ctx); err == context.Canceled { - return - } - setctx, setcancel := context.WithTimeout(ctx, clientV2.DefaultRequestTimeout) - key := fmt.Sprintf("foo%016x", rand.Intn(s.keySuffixRange)) - _, err := kv.Set(setctx, key, string(randBytes(s.keySize)), nil) - if err == nil { - atomic.AddInt64(&s.atomicModifiedKey, 1) - } - setcancel() - if err == context.Canceled { - return - } - } -} - -func (s *v2Stresser) Pause() { - s.cancel() - s.wg.Wait() -} - -func (s *v2Stresser) Close() { - s.Pause() -} - -func (s *v2Stresser) ModifiedKeys() int64 { - return atomic.LoadInt64(&s.atomicModifiedKey) -} - -func (s *v2Stresser) Checker() Checker { return nil } - -func randBytes(size int) []byte { - data := make([]byte, size) - for i := 0; i < size; i++ { - data[i] = byte(int('a') + rand.Intn(26)) - } - return data -}