mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
functional-tester/etcd-tester: remove
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
This commit is contained in:
parent
f7eab57b3c
commit
0e7f48b375
@ -1,264 +0,0 @@
|
||||
// Copyright 2016 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
const (
|
||||
retries = 7
|
||||
)
|
||||
|
||||
type Checker interface {
|
||||
// Check returns an error if the system fails a consistency check.
|
||||
Check() error
|
||||
}
|
||||
|
||||
type hashAndRevGetter interface {
|
||||
getRevisionHash() (revs map[string]int64, hashes map[string]int64, err error)
|
||||
}
|
||||
|
||||
type hashChecker struct {
|
||||
hrg hashAndRevGetter
|
||||
}
|
||||
|
||||
func newHashChecker(hrg hashAndRevGetter) Checker { return &hashChecker{hrg} }
|
||||
|
||||
const leaseCheckerTimeout = 10 * time.Second
|
||||
|
||||
func (hc *hashChecker) checkRevAndHashes() (err error) {
|
||||
var (
|
||||
revs map[string]int64
|
||||
hashes map[string]int64
|
||||
)
|
||||
|
||||
// retries in case of transient failure or etcd cluster has not stablized yet.
|
||||
for i := 0; i < retries; i++ {
|
||||
revs, hashes, err = hc.hrg.getRevisionHash()
|
||||
if err != nil {
|
||||
plog.Warningf("retry %d. failed to retrieve revison and hash (%v)", i, err)
|
||||
} else {
|
||||
sameRev := getSameValue(revs)
|
||||
sameHashes := getSameValue(hashes)
|
||||
if sameRev && sameHashes {
|
||||
return nil
|
||||
}
|
||||
plog.Warningf("retry %d. etcd cluster is not stable: [revisions: %v] and [hashes: %v]", i, revs, hashes)
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed revision and hash check (%v)", err)
|
||||
}
|
||||
|
||||
return fmt.Errorf("etcd cluster is not stable: [revisions: %v] and [hashes: %v]", revs, hashes)
|
||||
}
|
||||
|
||||
func (hc *hashChecker) Check() error {
|
||||
return hc.checkRevAndHashes()
|
||||
}
|
||||
|
||||
type leaseChecker struct {
|
||||
endpoint string
|
||||
ls *leaseStresser
|
||||
leaseClient pb.LeaseClient
|
||||
kvc pb.KVClient
|
||||
}
|
||||
|
||||
func (lc *leaseChecker) Check() error {
|
||||
conn, err := grpc.Dial(lc.ls.endpoint, grpc.WithInsecure(), grpc.WithBackoffMaxDelay(1))
|
||||
if err != nil {
|
||||
return fmt.Errorf("%v (%s)", err, lc.ls.endpoint)
|
||||
}
|
||||
defer func() {
|
||||
if conn != nil {
|
||||
conn.Close()
|
||||
}
|
||||
}()
|
||||
lc.kvc = pb.NewKVClient(conn)
|
||||
lc.leaseClient = pb.NewLeaseClient(conn)
|
||||
if err := lc.check(true, lc.ls.revokedLeases.leases); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := lc.check(false, lc.ls.aliveLeases.leases); err != nil {
|
||||
return err
|
||||
}
|
||||
return lc.checkShortLivedLeases()
|
||||
}
|
||||
|
||||
// checkShortLivedLeases ensures leases expire.
|
||||
func (lc *leaseChecker) checkShortLivedLeases() error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), leaseCheckerTimeout)
|
||||
errc := make(chan error)
|
||||
defer cancel()
|
||||
for leaseID := range lc.ls.shortLivedLeases.leases {
|
||||
go func(id int64) {
|
||||
errc <- lc.checkShortLivedLease(ctx, id)
|
||||
}(leaseID)
|
||||
}
|
||||
|
||||
var errs []error
|
||||
for range lc.ls.shortLivedLeases.leases {
|
||||
if err := <-errc; err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
}
|
||||
return errsToError(errs)
|
||||
}
|
||||
|
||||
func (lc *leaseChecker) checkShortLivedLease(ctx context.Context, leaseID int64) (err error) {
|
||||
// retry in case of transient failure or lease is expired but not yet revoked due to the fact that etcd cluster didn't have enought time to delete it.
|
||||
var resp *pb.LeaseTimeToLiveResponse
|
||||
for i := 0; i < retries; i++ {
|
||||
resp, err = lc.getLeaseByID(ctx, leaseID)
|
||||
// lease not found, for ~v3.1 compatibilities, check ErrLeaseNotFound
|
||||
if (err == nil && resp.TTL == -1) || (err != nil && rpctypes.Error(err) == rpctypes.ErrLeaseNotFound) {
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
plog.Debugf("retry %d. failed to retrieve lease %v error (%v)", i, leaseID, err)
|
||||
continue
|
||||
}
|
||||
if resp.TTL > 0 {
|
||||
plog.Debugf("lease %v is not expired. sleep for %d until it expires.", leaseID, resp.TTL)
|
||||
time.Sleep(time.Duration(resp.TTL) * time.Second)
|
||||
} else {
|
||||
plog.Debugf("retry %d. lease %v is expired but not yet revoked", i, leaseID)
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
if err = lc.checkLease(ctx, false, leaseID); err != nil {
|
||||
continue
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (lc *leaseChecker) checkLease(ctx context.Context, expired bool, leaseID int64) error {
|
||||
keysExpired, err := lc.hasKeysAttachedToLeaseExpired(ctx, leaseID)
|
||||
if err != nil {
|
||||
plog.Errorf("hasKeysAttachedToLeaseExpired error %v (endpoint %q)", err, lc.endpoint)
|
||||
return err
|
||||
}
|
||||
leaseExpired, err := lc.hasLeaseExpired(ctx, leaseID)
|
||||
if err != nil {
|
||||
plog.Errorf("hasLeaseExpired error %v (endpoint %q)", err, lc.endpoint)
|
||||
return err
|
||||
}
|
||||
if leaseExpired != keysExpired {
|
||||
return fmt.Errorf("lease %v expiration mismatch (lease expired=%v, keys expired=%v)", leaseID, leaseExpired, keysExpired)
|
||||
}
|
||||
if leaseExpired != expired {
|
||||
return fmt.Errorf("lease %v expected expired=%v, got %v", leaseID, expired, leaseExpired)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (lc *leaseChecker) check(expired bool, leases map[int64]time.Time) error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), leaseCheckerTimeout)
|
||||
defer cancel()
|
||||
for leaseID := range leases {
|
||||
if err := lc.checkLease(ctx, expired, leaseID); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (lc *leaseChecker) getLeaseByID(ctx context.Context, leaseID int64) (*pb.LeaseTimeToLiveResponse, error) {
|
||||
ltl := &pb.LeaseTimeToLiveRequest{ID: leaseID, Keys: true}
|
||||
return lc.leaseClient.LeaseTimeToLive(ctx, ltl, grpc.FailFast(false))
|
||||
}
|
||||
|
||||
func (lc *leaseChecker) hasLeaseExpired(ctx context.Context, leaseID int64) (bool, error) {
|
||||
// keep retrying until lease's state is known or ctx is being canceled
|
||||
for ctx.Err() == nil {
|
||||
resp, err := lc.getLeaseByID(ctx, leaseID)
|
||||
if err != nil {
|
||||
// for ~v3.1 compatibilities
|
||||
if rpctypes.Error(err) == rpctypes.ErrLeaseNotFound {
|
||||
return true, nil
|
||||
}
|
||||
} else {
|
||||
return resp.TTL == -1, nil
|
||||
}
|
||||
plog.Warningf("hasLeaseExpired %v resp %v error %v (endpoint %q)", leaseID, resp, err, lc.endpoint)
|
||||
}
|
||||
return false, ctx.Err()
|
||||
}
|
||||
|
||||
// The keys attached to the lease has the format of "<leaseID>_<idx>" where idx is the ordering key creation
|
||||
// Since the format of keys contains about leaseID, finding keys base on "<leaseID>" prefix
|
||||
// determines whether the attached keys for a given leaseID has been deleted or not
|
||||
func (lc *leaseChecker) hasKeysAttachedToLeaseExpired(ctx context.Context, leaseID int64) (bool, error) {
|
||||
resp, err := lc.kvc.Range(ctx, &pb.RangeRequest{
|
||||
Key: []byte(fmt.Sprintf("%d", leaseID)),
|
||||
RangeEnd: []byte(clientv3.GetPrefixRangeEnd(fmt.Sprintf("%d", leaseID))),
|
||||
}, grpc.FailFast(false))
|
||||
if err != nil {
|
||||
plog.Errorf("retrieving keys attached to lease %v error %v (endpoint %q)", leaseID, err, lc.endpoint)
|
||||
return false, err
|
||||
}
|
||||
return len(resp.Kvs) == 0, nil
|
||||
}
|
||||
|
||||
// compositeChecker implements a checker that runs a slice of Checkers concurrently.
|
||||
type compositeChecker struct{ checkers []Checker }
|
||||
|
||||
func newCompositeChecker(checkers []Checker) Checker {
|
||||
return &compositeChecker{checkers}
|
||||
}
|
||||
|
||||
func (cchecker *compositeChecker) Check() error {
|
||||
errc := make(chan error)
|
||||
for _, c := range cchecker.checkers {
|
||||
go func(chk Checker) { errc <- chk.Check() }(c)
|
||||
}
|
||||
var errs []error
|
||||
for range cchecker.checkers {
|
||||
if err := <-errc; err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
}
|
||||
return errsToError(errs)
|
||||
}
|
||||
|
||||
type runnerChecker struct {
|
||||
errc chan error
|
||||
}
|
||||
|
||||
func (rc *runnerChecker) Check() error {
|
||||
select {
|
||||
case err := <-rc.errc:
|
||||
return err
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
type noChecker struct{}
|
||||
|
||||
func newNoChecker() Checker { return &noChecker{} }
|
||||
func (nc *noChecker) Check() error { return nil }
|
||||
@ -1,261 +0,0 @@
|
||||
// Copyright 2015 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/tools/functional-tester/etcd-agent/client"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
// agentConfig holds information needed to interact/configure an agent and its etcd process
|
||||
type agentConfig struct {
|
||||
endpoint string
|
||||
clientPort int
|
||||
advertiseClientPort int
|
||||
peerPort int
|
||||
advertisePeerPort int
|
||||
failpointPort int
|
||||
}
|
||||
|
||||
type cluster struct {
|
||||
agents []agentConfig
|
||||
Size int
|
||||
Members []*member
|
||||
}
|
||||
|
||||
type ClusterStatus struct {
|
||||
AgentStatuses map[string]client.Status
|
||||
}
|
||||
|
||||
func (c *cluster) bootstrap() error {
|
||||
size := len(c.agents)
|
||||
|
||||
members := make([]*member, size)
|
||||
memberNameURLs := make([]string, size)
|
||||
for i, a := range c.agents {
|
||||
agent, err := client.NewAgent(a.endpoint)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
host, _, err := net.SplitHostPort(a.endpoint)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
members[i] = &member{
|
||||
Agent: agent,
|
||||
Endpoint: a.endpoint,
|
||||
Name: fmt.Sprintf("etcd-%d", i),
|
||||
ClientURL: fmt.Sprintf("http://%s:%d", host, a.clientPort),
|
||||
AdvertiseClientURL: fmt.Sprintf("http://%s:%d", host, a.advertiseClientPort),
|
||||
PeerURL: fmt.Sprintf("http://%s:%d", host, a.peerPort),
|
||||
AdvertisePeerURL: fmt.Sprintf("http://%s:%d", host, a.advertisePeerPort),
|
||||
FailpointURL: fmt.Sprintf("http://%s:%d", host, a.failpointPort),
|
||||
}
|
||||
memberNameURLs[i] = members[i].ClusterEntry()
|
||||
}
|
||||
clusterStr := strings.Join(memberNameURLs, ",")
|
||||
token := fmt.Sprint(rand.Int())
|
||||
|
||||
for i, m := range members {
|
||||
flags := append(
|
||||
m.Flags(),
|
||||
"--initial-cluster-token", token,
|
||||
"--initial-cluster", clusterStr,
|
||||
)
|
||||
|
||||
if _, err := m.Agent.Start(flags...); err != nil {
|
||||
// cleanup
|
||||
for _, m := range members[:i] {
|
||||
m.Agent.Terminate()
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
c.Size = size
|
||||
c.Members = members
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *cluster) Reset() error { return c.bootstrap() }
|
||||
|
||||
func (c *cluster) WaitHealth() error {
|
||||
var err error
|
||||
// wait 60s to check cluster health.
|
||||
// TODO: set it to a reasonable value. It is set that high because
|
||||
// follower may use long time to catch up the leader when reboot under
|
||||
// reasonable workload (https://github.com/coreos/etcd/issues/2698)
|
||||
for i := 0; i < 60; i++ {
|
||||
for _, m := range c.Members {
|
||||
if err = m.SetHealthKeyV3(); err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
plog.Warningf("#%d setHealthKey error (%v)", i, err)
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// GetLeader returns the index of leader and error if any.
|
||||
func (c *cluster) GetLeader() (int, error) {
|
||||
for i, m := range c.Members {
|
||||
isLeader, err := m.IsLeader()
|
||||
if isLeader || err != nil {
|
||||
return i, err
|
||||
}
|
||||
}
|
||||
return 0, fmt.Errorf("no leader found")
|
||||
}
|
||||
|
||||
func (c *cluster) Cleanup() error {
|
||||
var lasterr error
|
||||
for _, m := range c.Members {
|
||||
if err := m.Agent.Cleanup(); err != nil {
|
||||
lasterr = err
|
||||
}
|
||||
}
|
||||
return lasterr
|
||||
}
|
||||
|
||||
func (c *cluster) Terminate() {
|
||||
for _, m := range c.Members {
|
||||
m.Agent.Terminate()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *cluster) Status() ClusterStatus {
|
||||
cs := ClusterStatus{
|
||||
AgentStatuses: make(map[string]client.Status),
|
||||
}
|
||||
|
||||
for _, m := range c.Members {
|
||||
s, err := m.Agent.Status()
|
||||
// TODO: add a.Desc() as a key of the map
|
||||
desc := m.Endpoint
|
||||
if err != nil {
|
||||
cs.AgentStatuses[desc] = client.Status{State: "unknown"}
|
||||
plog.Printf("failed to get the status of agent [%s]", desc)
|
||||
}
|
||||
cs.AgentStatuses[desc] = s
|
||||
}
|
||||
return cs
|
||||
}
|
||||
|
||||
// maxRev returns the maximum revision found on the cluster.
|
||||
func (c *cluster) maxRev() (rev int64, err error) {
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), time.Second)
|
||||
defer cancel()
|
||||
revc, errc := make(chan int64, len(c.Members)), make(chan error, len(c.Members))
|
||||
for i := range c.Members {
|
||||
go func(m *member) {
|
||||
mrev, merr := m.Rev(ctx)
|
||||
revc <- mrev
|
||||
errc <- merr
|
||||
}(c.Members[i])
|
||||
}
|
||||
for i := 0; i < len(c.Members); i++ {
|
||||
if merr := <-errc; merr != nil {
|
||||
err = merr
|
||||
}
|
||||
if mrev := <-revc; mrev > rev {
|
||||
rev = mrev
|
||||
}
|
||||
}
|
||||
return rev, err
|
||||
}
|
||||
|
||||
func (c *cluster) getRevisionHash() (map[string]int64, map[string]int64, error) {
|
||||
revs := make(map[string]int64)
|
||||
hashes := make(map[string]int64)
|
||||
for _, m := range c.Members {
|
||||
rev, hash, err := m.RevHash()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
revs[m.ClientURL] = rev
|
||||
hashes[m.ClientURL] = hash
|
||||
}
|
||||
return revs, hashes, nil
|
||||
}
|
||||
|
||||
func (c *cluster) compactKV(rev int64, timeout time.Duration) (err error) {
|
||||
if rev <= 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
for i, m := range c.Members {
|
||||
u := m.ClientURL
|
||||
conn, derr := m.dialGRPC()
|
||||
if derr != nil {
|
||||
plog.Printf("[compact kv #%d] dial error %v (endpoint %s)", i, derr, u)
|
||||
err = derr
|
||||
continue
|
||||
}
|
||||
kvc := pb.NewKVClient(conn)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||
plog.Printf("[compact kv #%d] starting (endpoint %s)", i, u)
|
||||
_, cerr := kvc.Compact(ctx, &pb.CompactionRequest{Revision: rev, Physical: true}, grpc.FailFast(false))
|
||||
cancel()
|
||||
conn.Close()
|
||||
succeed := true
|
||||
if cerr != nil {
|
||||
if strings.Contains(cerr.Error(), "required revision has been compacted") && i > 0 {
|
||||
plog.Printf("[compact kv #%d] already compacted (endpoint %s)", i, u)
|
||||
} else {
|
||||
plog.Warningf("[compact kv #%d] error %v (endpoint %s)", i, cerr, u)
|
||||
err = cerr
|
||||
succeed = false
|
||||
}
|
||||
}
|
||||
if succeed {
|
||||
plog.Printf("[compact kv #%d] done (endpoint %s)", i, u)
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *cluster) checkCompact(rev int64) error {
|
||||
if rev == 0 {
|
||||
return nil
|
||||
}
|
||||
for _, m := range c.Members {
|
||||
if err := m.CheckCompact(rev); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *cluster) defrag() error {
|
||||
for _, m := range c.Members {
|
||||
if err := m.Defrag(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -1,16 +0,0 @@
|
||||
// Copyright 2016 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// etcd-tester is a single controller for all etcd-agents to manage an etcd cluster and simulate failures.
|
||||
package main
|
||||
@ -1,97 +0,0 @@
|
||||
// Copyright 2017 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os/exec"
|
||||
"syscall"
|
||||
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
type runnerStresser struct {
|
||||
cmd *exec.Cmd
|
||||
cmdStr string
|
||||
args []string
|
||||
rl *rate.Limiter
|
||||
reqRate int
|
||||
|
||||
errc chan error
|
||||
donec chan struct{}
|
||||
}
|
||||
|
||||
func newRunnerStresser(cmdStr string, args []string, rl *rate.Limiter, reqRate int) *runnerStresser {
|
||||
rl.SetLimit(rl.Limit() - rate.Limit(reqRate))
|
||||
return &runnerStresser{
|
||||
cmdStr: cmdStr,
|
||||
args: args,
|
||||
rl: rl,
|
||||
reqRate: reqRate,
|
||||
errc: make(chan error, 1),
|
||||
donec: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (rs *runnerStresser) setupOnce() (err error) {
|
||||
if rs.cmd != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
rs.cmd = exec.Command(rs.cmdStr, rs.args...)
|
||||
stderr, err := rs.cmd.StderrPipe()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer close(rs.donec)
|
||||
out, err := ioutil.ReadAll(stderr)
|
||||
if err != nil {
|
||||
rs.errc <- err
|
||||
} else {
|
||||
rs.errc <- fmt.Errorf("(%v %v) stderr %v", rs.cmdStr, rs.args, string(out))
|
||||
}
|
||||
}()
|
||||
|
||||
return rs.cmd.Start()
|
||||
}
|
||||
|
||||
func (rs *runnerStresser) Stress() (err error) {
|
||||
if err = rs.setupOnce(); err != nil {
|
||||
return err
|
||||
}
|
||||
return syscall.Kill(rs.cmd.Process.Pid, syscall.SIGCONT)
|
||||
}
|
||||
|
||||
func (rs *runnerStresser) Pause() {
|
||||
syscall.Kill(rs.cmd.Process.Pid, syscall.SIGSTOP)
|
||||
}
|
||||
|
||||
func (rs *runnerStresser) Close() {
|
||||
syscall.Kill(rs.cmd.Process.Pid, syscall.SIGINT)
|
||||
rs.cmd.Wait()
|
||||
<-rs.donec
|
||||
rs.rl.SetLimit(rs.rl.Limit() + rate.Limit(rs.reqRate))
|
||||
}
|
||||
|
||||
func (rs *runnerStresser) ModifiedKeys() int64 {
|
||||
return 1
|
||||
}
|
||||
|
||||
func (rs *runnerStresser) Checker() Checker {
|
||||
return &runnerChecker{rs.errc}
|
||||
}
|
||||
@ -1,160 +0,0 @@
|
||||
// Copyright 2016 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type failpointStats struct {
|
||||
// crashes counts the number of crashes for a failpoint
|
||||
crashes map[string]int
|
||||
// mu protects crashes
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
var fpStats failpointStats
|
||||
|
||||
func failpointFailures(c *cluster, failpoints []string) (ret []failure, err error) {
|
||||
var fps []string
|
||||
fps, err = failpointPaths(c.Members[0].FailpointURL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// create failure objects for all failpoints
|
||||
for _, fp := range fps {
|
||||
if len(fp) == 0 {
|
||||
continue
|
||||
}
|
||||
fpFails := failuresFromFailpoint(fp, failpoints)
|
||||
// wrap in delays so failpoint has time to trigger
|
||||
for i, fpf := range fpFails {
|
||||
if strings.Contains(fp, "Snap") {
|
||||
// hack to trigger snapshot failpoints
|
||||
fpFails[i] = &failureUntilSnapshot{fpf}
|
||||
} else {
|
||||
fpFails[i] = &failureDelay{fpf, 3 * time.Second}
|
||||
}
|
||||
}
|
||||
ret = append(ret, fpFails...)
|
||||
}
|
||||
fpStats.crashes = make(map[string]int)
|
||||
return ret, err
|
||||
}
|
||||
|
||||
func failpointPaths(endpoint string) ([]string, error) {
|
||||
resp, err := http.Get(endpoint)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
body, rerr := ioutil.ReadAll(resp.Body)
|
||||
if rerr != nil {
|
||||
return nil, rerr
|
||||
}
|
||||
var fps []string
|
||||
for _, l := range strings.Split(string(body), "\n") {
|
||||
fp := strings.Split(l, "=")[0]
|
||||
fps = append(fps, fp)
|
||||
}
|
||||
return fps, nil
|
||||
}
|
||||
|
||||
// failpoints follows FreeBSD KFAIL_POINT syntax.
|
||||
// e.g. panic("etcd-tester"),1*sleep(1000)->panic("etcd-tester")
|
||||
func failuresFromFailpoint(fp string, failpoints []string) (fs []failure) {
|
||||
recov := makeRecoverFailpoint(fp)
|
||||
for _, failpoint := range failpoints {
|
||||
inject := makeInjectFailpoint(fp, failpoint)
|
||||
fs = append(fs, []failure{
|
||||
&failureOne{
|
||||
description: description(fmt.Sprintf("failpoint %s (one: %s)", fp, failpoint)),
|
||||
injectMember: inject,
|
||||
recoverMember: recov,
|
||||
},
|
||||
&failureAll{
|
||||
description: description(fmt.Sprintf("failpoint %s (all: %s)", fp, failpoint)),
|
||||
injectMember: inject,
|
||||
recoverMember: recov,
|
||||
},
|
||||
&failureMajority{
|
||||
description: description(fmt.Sprintf("failpoint %s (majority: %s)", fp, failpoint)),
|
||||
injectMember: inject,
|
||||
recoverMember: recov,
|
||||
},
|
||||
&failureLeader{
|
||||
failureByFunc{
|
||||
description: description(fmt.Sprintf("failpoint %s (leader: %s)", fp, failpoint)),
|
||||
injectMember: inject,
|
||||
recoverMember: recov,
|
||||
},
|
||||
0,
|
||||
},
|
||||
}...)
|
||||
}
|
||||
return fs
|
||||
}
|
||||
|
||||
func makeInjectFailpoint(fp, val string) injectMemberFunc {
|
||||
return func(m *member) (err error) {
|
||||
return putFailpoint(m.FailpointURL, fp, val)
|
||||
}
|
||||
}
|
||||
|
||||
func makeRecoverFailpoint(fp string) recoverMemberFunc {
|
||||
return func(m *member) error {
|
||||
if err := delFailpoint(m.FailpointURL, fp); err == nil {
|
||||
return nil
|
||||
}
|
||||
// node not responding, likely dead from fp panic; restart
|
||||
fpStats.mu.Lock()
|
||||
fpStats.crashes[fp]++
|
||||
fpStats.mu.Unlock()
|
||||
return recoverStop(m)
|
||||
}
|
||||
}
|
||||
|
||||
func putFailpoint(ep, fp, val string) error {
|
||||
req, _ := http.NewRequest(http.MethodPut, ep+"/"+fp, strings.NewReader(val))
|
||||
c := http.Client{}
|
||||
resp, err := c.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp.Body.Close()
|
||||
if resp.StatusCode/100 != 2 {
|
||||
return fmt.Errorf("failed to PUT %s=%s at %s (%v)", fp, val, ep, resp.Status)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func delFailpoint(ep, fp string) error {
|
||||
req, _ := http.NewRequest(http.MethodDelete, ep+"/"+fp, strings.NewReader(""))
|
||||
c := http.Client{}
|
||||
resp, err := c.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp.Body.Close()
|
||||
if resp.StatusCode/100 != 2 {
|
||||
return fmt.Errorf("failed to DELETE %s at %s (%v)", fp, ep, resp.Status)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -1,205 +0,0 @@
|
||||
// Copyright 2015 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"os/exec"
|
||||
"time"
|
||||
)
|
||||
|
||||
type failure interface {
|
||||
// Inject injeccts the failure into the testing cluster at the given
|
||||
// round. When calling the function, the cluster should be in health.
|
||||
Inject(c *cluster, round int) error
|
||||
// Recover recovers the injected failure caused by the injection of the
|
||||
// given round and wait for the recovery of the testing cluster.
|
||||
Recover(c *cluster, round int) error
|
||||
// Desc returns a description of the failure
|
||||
Desc() string
|
||||
}
|
||||
|
||||
type description string
|
||||
|
||||
func (d description) Desc() string { return string(d) }
|
||||
|
||||
type injectMemberFunc func(*member) error
|
||||
type recoverMemberFunc func(*member) error
|
||||
|
||||
type failureByFunc struct {
|
||||
description
|
||||
injectMember injectMemberFunc
|
||||
recoverMember recoverMemberFunc
|
||||
}
|
||||
|
||||
type failureOne failureByFunc
|
||||
type failureAll failureByFunc
|
||||
type failureMajority failureByFunc
|
||||
type failureLeader struct {
|
||||
failureByFunc
|
||||
idx int
|
||||
}
|
||||
|
||||
type failureDelay struct {
|
||||
failure
|
||||
delayDuration time.Duration
|
||||
}
|
||||
|
||||
// failureUntilSnapshot injects a failure and waits for a snapshot event
|
||||
type failureUntilSnapshot struct{ failure }
|
||||
|
||||
func (f *failureOne) Inject(c *cluster, round int) error {
|
||||
return f.injectMember(c.Members[round%c.Size])
|
||||
}
|
||||
|
||||
func (f *failureOne) Recover(c *cluster, round int) error {
|
||||
if err := f.recoverMember(c.Members[round%c.Size]); err != nil {
|
||||
return err
|
||||
}
|
||||
return c.WaitHealth()
|
||||
}
|
||||
|
||||
func (f *failureAll) Inject(c *cluster, round int) error {
|
||||
for _, m := range c.Members {
|
||||
if err := f.injectMember(m); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *failureAll) Recover(c *cluster, round int) error {
|
||||
for _, m := range c.Members {
|
||||
if err := f.recoverMember(m); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return c.WaitHealth()
|
||||
}
|
||||
|
||||
func (f *failureMajority) Inject(c *cluster, round int) error {
|
||||
for i := range killMap(c.Size, round) {
|
||||
if err := f.injectMember(c.Members[i]); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *failureMajority) Recover(c *cluster, round int) error {
|
||||
for i := range killMap(c.Size, round) {
|
||||
if err := f.recoverMember(c.Members[i]); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *failureLeader) Inject(c *cluster, round int) error {
|
||||
idx, err := c.GetLeader()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
f.idx = idx
|
||||
return f.injectMember(c.Members[idx])
|
||||
}
|
||||
|
||||
func (f *failureLeader) Recover(c *cluster, round int) error {
|
||||
if err := f.recoverMember(c.Members[f.idx]); err != nil {
|
||||
return err
|
||||
}
|
||||
return c.WaitHealth()
|
||||
}
|
||||
|
||||
func (f *failureDelay) Inject(c *cluster, round int) error {
|
||||
if err := f.failure.Inject(c, round); err != nil {
|
||||
return err
|
||||
}
|
||||
if f.delayDuration > 0 {
|
||||
plog.Infof("sleeping delay duration %v for %q", f.delayDuration, f.failure.Desc())
|
||||
time.Sleep(f.delayDuration)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *failureUntilSnapshot) Inject(c *cluster, round int) error {
|
||||
if err := f.failure.Inject(c, round); err != nil {
|
||||
return err
|
||||
}
|
||||
if c.Size < 3 {
|
||||
return nil
|
||||
}
|
||||
// maxRev may fail since failure just injected, retry if failed.
|
||||
startRev, err := c.maxRev()
|
||||
for i := 0; i < 10 && startRev == 0; i++ {
|
||||
startRev, err = c.maxRev()
|
||||
}
|
||||
if startRev == 0 {
|
||||
return err
|
||||
}
|
||||
lastRev := startRev
|
||||
// Normal healthy cluster could accept 1000req/s at least.
|
||||
// Give it 3-times time to create a new snapshot.
|
||||
retry := snapshotCount / 1000 * 3
|
||||
for j := 0; j < retry; j++ {
|
||||
lastRev, _ = c.maxRev()
|
||||
// If the number of proposals committed is bigger than snapshot count,
|
||||
// a new snapshot should have been created.
|
||||
if lastRev-startRev > snapshotCount {
|
||||
return nil
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
return fmt.Errorf("cluster too slow: only commit %d requests in %ds", lastRev-startRev, retry)
|
||||
}
|
||||
|
||||
func (f *failureUntilSnapshot) Desc() string {
|
||||
return f.failure.Desc() + " for a long time and expect it to recover from an incoming snapshot"
|
||||
}
|
||||
|
||||
func killMap(size int, seed int) map[int]bool {
|
||||
m := make(map[int]bool)
|
||||
r := rand.New(rand.NewSource(int64(seed)))
|
||||
majority := size/2 + 1
|
||||
for {
|
||||
m[r.Intn(size)] = true
|
||||
if len(m) >= majority {
|
||||
return m
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type failureNop failureByFunc
|
||||
|
||||
func (f *failureNop) Inject(c *cluster, round int) error { return nil }
|
||||
func (f *failureNop) Recover(c *cluster, round int) error { return nil }
|
||||
|
||||
type failureExternal struct {
|
||||
failure
|
||||
|
||||
description string
|
||||
scriptPath string
|
||||
}
|
||||
|
||||
func (f *failureExternal) Inject(c *cluster, round int) error {
|
||||
return exec.Command(f.scriptPath, "enable", fmt.Sprintf("%d", round)).Run()
|
||||
}
|
||||
|
||||
func (f *failureExternal) Recover(c *cluster, round int) error {
|
||||
return exec.Command(f.scriptPath, "disable", fmt.Sprintf("%d", round)).Run()
|
||||
}
|
||||
|
||||
func (f *failureExternal) Desc() string { return f.description }
|
||||
@ -1,177 +0,0 @@
|
||||
// Copyright 2016 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
snapshotCount = 10000
|
||||
slowNetworkLatency = 500 // 500 millisecond
|
||||
randomVariation = 50
|
||||
|
||||
// delay duration to trigger leader election (default election timeout 1s)
|
||||
triggerElectionDur = 5 * time.Second
|
||||
|
||||
// Wait more when it recovers from slow network, because network layer
|
||||
// needs extra time to propagate traffic control (tc command) change.
|
||||
// Otherwise, we get different hash values from the previous revision.
|
||||
// For more detail, please see https://github.com/coreos/etcd/issues/5121.
|
||||
waitRecover = 5 * time.Second
|
||||
)
|
||||
|
||||
func injectStop(m *member) error { return m.Agent.Stop() }
|
||||
func recoverStop(m *member) error {
|
||||
_, err := m.Agent.Restart()
|
||||
return err
|
||||
}
|
||||
|
||||
func newFailureKillAll() failure {
|
||||
return &failureAll{
|
||||
description: "kill all members",
|
||||
injectMember: injectStop,
|
||||
recoverMember: recoverStop,
|
||||
}
|
||||
}
|
||||
|
||||
func newFailureKillMajority() failure {
|
||||
return &failureMajority{
|
||||
description: "kill majority of the cluster",
|
||||
injectMember: injectStop,
|
||||
recoverMember: recoverStop,
|
||||
}
|
||||
}
|
||||
|
||||
func newFailureKillOne() failure {
|
||||
return &failureOne{
|
||||
description: "kill one random member",
|
||||
injectMember: injectStop,
|
||||
recoverMember: recoverStop,
|
||||
}
|
||||
}
|
||||
|
||||
func newFailureKillLeader() failure {
|
||||
ff := failureByFunc{
|
||||
description: "kill leader member",
|
||||
injectMember: injectStop,
|
||||
recoverMember: recoverStop,
|
||||
}
|
||||
return &failureLeader{ff, 0}
|
||||
}
|
||||
|
||||
func newFailureKillOneForLongTime() failure {
|
||||
return &failureUntilSnapshot{newFailureKillOne()}
|
||||
}
|
||||
|
||||
func newFailureKillLeaderForLongTime() failure {
|
||||
return &failureUntilSnapshot{newFailureKillLeader()}
|
||||
}
|
||||
|
||||
func injectDropPort(m *member) error { return m.Agent.DropPort(m.peerPort()) }
|
||||
func recoverDropPort(m *member) error { return m.Agent.RecoverPort(m.peerPort()) }
|
||||
|
||||
func newFailureIsolate() failure {
|
||||
f := &failureOne{
|
||||
description: "isolate one member",
|
||||
injectMember: injectDropPort,
|
||||
recoverMember: recoverDropPort,
|
||||
}
|
||||
return &failureDelay{
|
||||
failure: f,
|
||||
delayDuration: triggerElectionDur,
|
||||
}
|
||||
}
|
||||
|
||||
func newFailureIsolateAll() failure {
|
||||
f := &failureAll{
|
||||
description: "isolate all members",
|
||||
injectMember: injectDropPort,
|
||||
recoverMember: recoverDropPort,
|
||||
}
|
||||
return &failureDelay{
|
||||
failure: f,
|
||||
delayDuration: triggerElectionDur,
|
||||
}
|
||||
}
|
||||
|
||||
func injectLatency(m *member) error {
|
||||
if err := m.Agent.SetLatency(slowNetworkLatency, randomVariation); err != nil {
|
||||
m.Agent.RemoveLatency()
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func recoverLatency(m *member) error {
|
||||
if err := m.Agent.RemoveLatency(); err != nil {
|
||||
return err
|
||||
}
|
||||
time.Sleep(waitRecover)
|
||||
return nil
|
||||
}
|
||||
|
||||
func newFailureSlowNetworkOneMember() failure {
|
||||
desc := fmt.Sprintf("slow down one member's network by adding %d ms latency", slowNetworkLatency)
|
||||
f := &failureOne{
|
||||
description: description(desc),
|
||||
injectMember: injectLatency,
|
||||
recoverMember: recoverLatency,
|
||||
}
|
||||
return &failureDelay{
|
||||
failure: f,
|
||||
delayDuration: triggerElectionDur,
|
||||
}
|
||||
}
|
||||
|
||||
func newFailureSlowNetworkLeader() failure {
|
||||
desc := fmt.Sprintf("slow down leader's network by adding %d ms latency", slowNetworkLatency)
|
||||
ff := failureByFunc{
|
||||
description: description(desc),
|
||||
injectMember: injectLatency,
|
||||
recoverMember: recoverLatency,
|
||||
}
|
||||
f := &failureLeader{ff, 0}
|
||||
return &failureDelay{
|
||||
failure: f,
|
||||
delayDuration: triggerElectionDur,
|
||||
}
|
||||
}
|
||||
|
||||
func newFailureSlowNetworkAll() failure {
|
||||
f := &failureAll{
|
||||
description: "slow down all members' network",
|
||||
injectMember: injectLatency,
|
||||
recoverMember: recoverLatency,
|
||||
}
|
||||
return &failureDelay{
|
||||
failure: f,
|
||||
delayDuration: triggerElectionDur,
|
||||
}
|
||||
}
|
||||
|
||||
func newFailureNop() failure {
|
||||
return &failureNop{
|
||||
description: "no failure",
|
||||
}
|
||||
}
|
||||
|
||||
func newFailureExternal(scriptPath string) failure {
|
||||
return &failureExternal{
|
||||
description: fmt.Sprintf("external fault injector (script: %s)", scriptPath),
|
||||
scriptPath: scriptPath,
|
||||
}
|
||||
}
|
||||
@ -1,44 +0,0 @@
|
||||
// Copyright 2015 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
type statusHandler struct {
|
||||
status *Status
|
||||
}
|
||||
|
||||
func (sh statusHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
en := json.NewEncoder(w)
|
||||
|
||||
sh.status.mu.Lock()
|
||||
defer sh.status.mu.Unlock()
|
||||
|
||||
if err := en.Encode(Status{
|
||||
Since: sh.status.Since,
|
||||
Failures: sh.status.Failures,
|
||||
RoundLimit: sh.status.RoundLimit,
|
||||
Cluster: sh.status.cluster.Status(),
|
||||
cluster: sh.status.cluster,
|
||||
Round: sh.status.Round,
|
||||
Case: sh.status.Case,
|
||||
}); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
}
|
||||
}
|
||||
@ -1,331 +0,0 @@
|
||||
// Copyright 2016 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
|
||||
"golang.org/x/time/rate"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/transport"
|
||||
)
|
||||
|
||||
type keyStresser struct {
|
||||
Endpoint string
|
||||
|
||||
keyLargeSize int
|
||||
keySize int
|
||||
keySuffixRange int
|
||||
keyTxnSuffixRange int
|
||||
keyTxnOps int
|
||||
|
||||
N int
|
||||
|
||||
rateLimiter *rate.Limiter
|
||||
|
||||
wg sync.WaitGroup
|
||||
|
||||
cancel func()
|
||||
conn *grpc.ClientConn
|
||||
// atomicModifiedKeys records the number of keys created and deleted by the stresser.
|
||||
atomicModifiedKeys int64
|
||||
|
||||
stressTable *stressTable
|
||||
}
|
||||
|
||||
func (s *keyStresser) Stress() error {
|
||||
// TODO: add backoff option
|
||||
conn, err := grpc.Dial(s.Endpoint, grpc.WithInsecure())
|
||||
if err != nil {
|
||||
return fmt.Errorf("%v (%s)", err, s.Endpoint)
|
||||
}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
s.wg.Add(s.N)
|
||||
s.conn = conn
|
||||
s.cancel = cancel
|
||||
|
||||
kvc := pb.NewKVClient(conn)
|
||||
|
||||
var stressEntries = []stressEntry{
|
||||
{weight: 0.7, f: newStressPut(kvc, s.keySuffixRange, s.keySize)},
|
||||
{
|
||||
weight: 0.7 * float32(s.keySize) / float32(s.keyLargeSize),
|
||||
f: newStressPut(kvc, s.keySuffixRange, s.keyLargeSize),
|
||||
},
|
||||
{weight: 0.07, f: newStressRange(kvc, s.keySuffixRange)},
|
||||
{weight: 0.07, f: newStressRangeInterval(kvc, s.keySuffixRange)},
|
||||
{weight: 0.07, f: newStressDelete(kvc, s.keySuffixRange)},
|
||||
{weight: 0.07, f: newStressDeleteInterval(kvc, s.keySuffixRange)},
|
||||
}
|
||||
if s.keyTxnSuffixRange > 0 {
|
||||
// adjust to make up ±70% of workloads with writes
|
||||
stressEntries[0].weight = 0.35
|
||||
stressEntries = append(stressEntries, stressEntry{
|
||||
weight: 0.35,
|
||||
f: newStressTxn(kvc, s.keyTxnSuffixRange, s.keyTxnOps),
|
||||
})
|
||||
}
|
||||
s.stressTable = createStressTable(stressEntries)
|
||||
|
||||
for i := 0; i < s.N; i++ {
|
||||
go s.run(ctx)
|
||||
}
|
||||
|
||||
plog.Infof("keyStresser %q is started", s.Endpoint)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *keyStresser) run(ctx context.Context) {
|
||||
defer s.wg.Done()
|
||||
|
||||
for {
|
||||
if err := s.rateLimiter.Wait(ctx); err == context.Canceled {
|
||||
return
|
||||
}
|
||||
|
||||
// TODO: 10-second is enough timeout to cover leader failure
|
||||
// and immediate leader election. Find out what other cases this
|
||||
// could be timed out.
|
||||
sctx, scancel := context.WithTimeout(ctx, 10*time.Second)
|
||||
err, modifiedKeys := s.stressTable.choose()(sctx)
|
||||
scancel()
|
||||
if err == nil {
|
||||
atomic.AddInt64(&s.atomicModifiedKeys, modifiedKeys)
|
||||
continue
|
||||
}
|
||||
|
||||
switch rpctypes.ErrorDesc(err) {
|
||||
case context.DeadlineExceeded.Error():
|
||||
// This retries when request is triggered at the same time as
|
||||
// leader failure. When we terminate the leader, the request to
|
||||
// that leader cannot be processed, and times out. Also requests
|
||||
// to followers cannot be forwarded to the old leader, so timing out
|
||||
// as well. We want to keep stressing until the cluster elects a
|
||||
// new leader and start processing requests again.
|
||||
case etcdserver.ErrTimeoutDueToLeaderFail.Error(), etcdserver.ErrTimeout.Error():
|
||||
// This retries when request is triggered at the same time as
|
||||
// leader failure and follower nodes receive time out errors
|
||||
// from losing their leader. Followers should retry to connect
|
||||
// to the new leader.
|
||||
case etcdserver.ErrStopped.Error():
|
||||
// one of the etcd nodes stopped from failure injection
|
||||
case transport.ErrConnClosing.Desc:
|
||||
// server closed the transport (failure injected node)
|
||||
case rpctypes.ErrNotCapable.Error():
|
||||
// capability check has not been done (in the beginning)
|
||||
case rpctypes.ErrTooManyRequests.Error():
|
||||
// hitting the recovering member.
|
||||
case context.Canceled.Error():
|
||||
// from stresser.Cancel method:
|
||||
return
|
||||
case grpc.ErrClientConnClosing.Error():
|
||||
// from stresser.Cancel method:
|
||||
return
|
||||
default:
|
||||
plog.Errorf("keyStresser %v exited with error (%v)", s.Endpoint, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *keyStresser) Pause() {
|
||||
s.Close()
|
||||
}
|
||||
|
||||
func (s *keyStresser) Close() {
|
||||
s.cancel()
|
||||
s.conn.Close()
|
||||
s.wg.Wait()
|
||||
plog.Infof("keyStresser %q is closed", s.Endpoint)
|
||||
|
||||
}
|
||||
|
||||
func (s *keyStresser) ModifiedKeys() int64 {
|
||||
return atomic.LoadInt64(&s.atomicModifiedKeys)
|
||||
}
|
||||
|
||||
func (s *keyStresser) Checker() Checker { return nil }
|
||||
|
||||
type stressFunc func(ctx context.Context) (err error, modifiedKeys int64)
|
||||
|
||||
type stressEntry struct {
|
||||
weight float32
|
||||
f stressFunc
|
||||
}
|
||||
|
||||
type stressTable struct {
|
||||
entries []stressEntry
|
||||
sumWeights float32
|
||||
}
|
||||
|
||||
func createStressTable(entries []stressEntry) *stressTable {
|
||||
st := stressTable{entries: entries}
|
||||
for _, entry := range st.entries {
|
||||
st.sumWeights += entry.weight
|
||||
}
|
||||
return &st
|
||||
}
|
||||
|
||||
func (st *stressTable) choose() stressFunc {
|
||||
v := rand.Float32() * st.sumWeights
|
||||
var sum float32
|
||||
var idx int
|
||||
for i := range st.entries {
|
||||
sum += st.entries[i].weight
|
||||
if sum >= v {
|
||||
idx = i
|
||||
break
|
||||
}
|
||||
}
|
||||
return st.entries[idx].f
|
||||
}
|
||||
|
||||
func newStressPut(kvc pb.KVClient, keySuffixRange, keySize int) stressFunc {
|
||||
return func(ctx context.Context) (error, int64) {
|
||||
_, err := kvc.Put(ctx, &pb.PutRequest{
|
||||
Key: []byte(fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange))),
|
||||
Value: randBytes(keySize),
|
||||
}, grpc.FailFast(false))
|
||||
return err, 1
|
||||
}
|
||||
}
|
||||
|
||||
func newStressTxn(kvc pb.KVClient, keyTxnSuffixRange, txnOps int) stressFunc {
|
||||
keys := make([]string, keyTxnSuffixRange)
|
||||
for i := range keys {
|
||||
keys[i] = fmt.Sprintf("/k%03d", i)
|
||||
}
|
||||
return writeTxn(kvc, keys, txnOps)
|
||||
}
|
||||
|
||||
func writeTxn(kvc pb.KVClient, keys []string, txnOps int) stressFunc {
|
||||
return func(ctx context.Context) (error, int64) {
|
||||
ks := make(map[string]struct{}, txnOps)
|
||||
for len(ks) != txnOps {
|
||||
ks[keys[rand.Intn(len(keys))]] = struct{}{}
|
||||
}
|
||||
selected := make([]string, 0, txnOps)
|
||||
for k := range ks {
|
||||
selected = append(selected, k)
|
||||
}
|
||||
com, delOp, putOp := getTxnReqs(selected[0], "bar00")
|
||||
txnReq := &pb.TxnRequest{
|
||||
Compare: []*pb.Compare{com},
|
||||
Success: []*pb.RequestOp{delOp},
|
||||
Failure: []*pb.RequestOp{putOp},
|
||||
}
|
||||
|
||||
// add nested txns if any
|
||||
for i := 1; i < txnOps; i++ {
|
||||
k, v := selected[i], fmt.Sprintf("bar%02d", i)
|
||||
com, delOp, putOp = getTxnReqs(k, v)
|
||||
nested := &pb.RequestOp{
|
||||
Request: &pb.RequestOp_RequestTxn{
|
||||
RequestTxn: &pb.TxnRequest{
|
||||
Compare: []*pb.Compare{com},
|
||||
Success: []*pb.RequestOp{delOp},
|
||||
Failure: []*pb.RequestOp{putOp},
|
||||
},
|
||||
},
|
||||
}
|
||||
txnReq.Success = append(txnReq.Success, nested)
|
||||
txnReq.Failure = append(txnReq.Failure, nested)
|
||||
}
|
||||
|
||||
_, err := kvc.Txn(ctx, txnReq, grpc.FailFast(false))
|
||||
return err, int64(txnOps)
|
||||
}
|
||||
}
|
||||
|
||||
func getTxnReqs(key, val string) (com *pb.Compare, delOp *pb.RequestOp, putOp *pb.RequestOp) {
|
||||
// if key exists (version > 0)
|
||||
com = &pb.Compare{
|
||||
Key: []byte(key),
|
||||
Target: pb.Compare_VERSION,
|
||||
Result: pb.Compare_GREATER,
|
||||
TargetUnion: &pb.Compare_Version{Version: 0},
|
||||
}
|
||||
delOp = &pb.RequestOp{
|
||||
Request: &pb.RequestOp_RequestDeleteRange{
|
||||
RequestDeleteRange: &pb.DeleteRangeRequest{
|
||||
Key: []byte(key),
|
||||
},
|
||||
},
|
||||
}
|
||||
putOp = &pb.RequestOp{
|
||||
Request: &pb.RequestOp_RequestPut{
|
||||
RequestPut: &pb.PutRequest{
|
||||
Key: []byte(key),
|
||||
Value: []byte(val),
|
||||
},
|
||||
},
|
||||
}
|
||||
return com, delOp, putOp
|
||||
}
|
||||
|
||||
func newStressRange(kvc pb.KVClient, keySuffixRange int) stressFunc {
|
||||
return func(ctx context.Context) (error, int64) {
|
||||
_, err := kvc.Range(ctx, &pb.RangeRequest{
|
||||
Key: []byte(fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange))),
|
||||
}, grpc.FailFast(false))
|
||||
return err, 0
|
||||
}
|
||||
}
|
||||
|
||||
func newStressRangeInterval(kvc pb.KVClient, keySuffixRange int) stressFunc {
|
||||
return func(ctx context.Context) (error, int64) {
|
||||
start := rand.Intn(keySuffixRange)
|
||||
end := start + 500
|
||||
_, err := kvc.Range(ctx, &pb.RangeRequest{
|
||||
Key: []byte(fmt.Sprintf("foo%016x", start)),
|
||||
RangeEnd: []byte(fmt.Sprintf("foo%016x", end)),
|
||||
}, grpc.FailFast(false))
|
||||
return err, 0
|
||||
}
|
||||
}
|
||||
|
||||
func newStressDelete(kvc pb.KVClient, keySuffixRange int) stressFunc {
|
||||
return func(ctx context.Context) (error, int64) {
|
||||
_, err := kvc.DeleteRange(ctx, &pb.DeleteRangeRequest{
|
||||
Key: []byte(fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange))),
|
||||
}, grpc.FailFast(false))
|
||||
return err, 1
|
||||
}
|
||||
}
|
||||
|
||||
func newStressDeleteInterval(kvc pb.KVClient, keySuffixRange int) stressFunc {
|
||||
return func(ctx context.Context) (error, int64) {
|
||||
start := rand.Intn(keySuffixRange)
|
||||
end := start + 500
|
||||
resp, err := kvc.DeleteRange(ctx, &pb.DeleteRangeRequest{
|
||||
Key: []byte(fmt.Sprintf("foo%016x", start)),
|
||||
RangeEnd: []byte(fmt.Sprintf("foo%016x", end)),
|
||||
}, grpc.FailFast(false))
|
||||
if err == nil {
|
||||
return nil, resp.Deleted
|
||||
}
|
||||
return err, 0
|
||||
}
|
||||
}
|
||||
@ -1,382 +0,0 @@
|
||||
// Copyright 2016 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
|
||||
"golang.org/x/time/rate"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
const (
|
||||
// time to live for lease
|
||||
TTL = 120
|
||||
TTLShort = 2
|
||||
)
|
||||
|
||||
type leaseStresser struct {
|
||||
endpoint string
|
||||
cancel func()
|
||||
conn *grpc.ClientConn
|
||||
kvc pb.KVClient
|
||||
lc pb.LeaseClient
|
||||
ctx context.Context
|
||||
|
||||
rateLimiter *rate.Limiter
|
||||
// atomicModifiedKey records the number of keys created and deleted during a test case
|
||||
atomicModifiedKey int64
|
||||
numLeases int
|
||||
keysPerLease int
|
||||
|
||||
aliveLeases *atomicLeases
|
||||
revokedLeases *atomicLeases
|
||||
shortLivedLeases *atomicLeases
|
||||
|
||||
runWg sync.WaitGroup
|
||||
aliveWg sync.WaitGroup
|
||||
}
|
||||
|
||||
type atomicLeases struct {
|
||||
// rwLock is used to protect read/write access of leases map
|
||||
// which are accessed and modified by different go routines.
|
||||
rwLock sync.RWMutex
|
||||
leases map[int64]time.Time
|
||||
}
|
||||
|
||||
func (al *atomicLeases) add(leaseID int64, t time.Time) {
|
||||
al.rwLock.Lock()
|
||||
al.leases[leaseID] = t
|
||||
al.rwLock.Unlock()
|
||||
}
|
||||
|
||||
func (al *atomicLeases) update(leaseID int64, t time.Time) {
|
||||
al.rwLock.Lock()
|
||||
_, ok := al.leases[leaseID]
|
||||
if ok {
|
||||
al.leases[leaseID] = t
|
||||
}
|
||||
al.rwLock.Unlock()
|
||||
}
|
||||
|
||||
func (al *atomicLeases) read(leaseID int64) (rv time.Time, ok bool) {
|
||||
al.rwLock.RLock()
|
||||
rv, ok = al.leases[leaseID]
|
||||
al.rwLock.RUnlock()
|
||||
return rv, ok
|
||||
}
|
||||
|
||||
func (al *atomicLeases) remove(leaseID int64) {
|
||||
al.rwLock.Lock()
|
||||
delete(al.leases, leaseID)
|
||||
al.rwLock.Unlock()
|
||||
}
|
||||
|
||||
func (al *atomicLeases) getLeasesMap() map[int64]time.Time {
|
||||
leasesCopy := make(map[int64]time.Time)
|
||||
al.rwLock.RLock()
|
||||
for k, v := range al.leases {
|
||||
leasesCopy[k] = v
|
||||
}
|
||||
al.rwLock.RUnlock()
|
||||
return leasesCopy
|
||||
}
|
||||
|
||||
func (ls *leaseStresser) setupOnce() error {
|
||||
if ls.aliveLeases != nil {
|
||||
return nil
|
||||
}
|
||||
if ls.numLeases == 0 {
|
||||
panic("expect numLeases to be set")
|
||||
}
|
||||
if ls.keysPerLease == 0 {
|
||||
panic("expect keysPerLease to be set")
|
||||
}
|
||||
|
||||
ls.aliveLeases = &atomicLeases{leases: make(map[int64]time.Time)}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ls *leaseStresser) Stress() error {
|
||||
plog.Infof("lease Stresser %v starting ...", ls.endpoint)
|
||||
if err := ls.setupOnce(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
conn, err := grpc.Dial(ls.endpoint, grpc.WithInsecure(), grpc.WithBackoffMaxDelay(1*time.Second))
|
||||
if err != nil {
|
||||
return fmt.Errorf("%v (%s)", err, ls.endpoint)
|
||||
}
|
||||
ls.conn = conn
|
||||
ls.kvc = pb.NewKVClient(conn)
|
||||
ls.lc = pb.NewLeaseClient(conn)
|
||||
ls.revokedLeases = &atomicLeases{leases: make(map[int64]time.Time)}
|
||||
ls.shortLivedLeases = &atomicLeases{leases: make(map[int64]time.Time)}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
ls.cancel = cancel
|
||||
ls.ctx = ctx
|
||||
|
||||
ls.runWg.Add(1)
|
||||
go ls.run()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ls *leaseStresser) run() {
|
||||
defer ls.runWg.Done()
|
||||
ls.restartKeepAlives()
|
||||
for {
|
||||
// the number of keys created and deleted is roughly 2x the number of created keys for an iteration.
|
||||
// the rateLimiter therefore consumes 2x ls.numLeases*ls.keysPerLease tokens where each token represents a create/delete operation for key.
|
||||
err := ls.rateLimiter.WaitN(ls.ctx, 2*ls.numLeases*ls.keysPerLease)
|
||||
if err == context.Canceled {
|
||||
return
|
||||
}
|
||||
plog.Debugf("creating lease on %v", ls.endpoint)
|
||||
ls.createLeases()
|
||||
plog.Debugf("done creating lease on %v", ls.endpoint)
|
||||
plog.Debugf("dropping lease on %v", ls.endpoint)
|
||||
ls.randomlyDropLeases()
|
||||
plog.Debugf("done dropping lease on %v", ls.endpoint)
|
||||
}
|
||||
}
|
||||
|
||||
func (ls *leaseStresser) restartKeepAlives() {
|
||||
for leaseID := range ls.aliveLeases.getLeasesMap() {
|
||||
ls.aliveWg.Add(1)
|
||||
go func(id int64) {
|
||||
ls.keepLeaseAlive(id)
|
||||
}(leaseID)
|
||||
}
|
||||
}
|
||||
|
||||
func (ls *leaseStresser) createLeases() {
|
||||
ls.createAliveLeases()
|
||||
ls.createShortLivedLeases()
|
||||
}
|
||||
|
||||
func (ls *leaseStresser) createAliveLeases() {
|
||||
neededLeases := ls.numLeases - len(ls.aliveLeases.getLeasesMap())
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < neededLeases; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
leaseID, err := ls.createLeaseWithKeys(TTL)
|
||||
if err != nil {
|
||||
plog.Debugf("lease creation error: (%v)", err)
|
||||
return
|
||||
}
|
||||
ls.aliveLeases.add(leaseID, time.Now())
|
||||
// keep track of all the keep lease alive go routines
|
||||
ls.aliveWg.Add(1)
|
||||
go ls.keepLeaseAlive(leaseID)
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (ls *leaseStresser) createShortLivedLeases() {
|
||||
// one round of createLeases() might not create all the short lived leases we want due to falures.
|
||||
// thus, we want to create remaining short lived leases in the future round.
|
||||
neededLeases := ls.numLeases - len(ls.shortLivedLeases.getLeasesMap())
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < neededLeases; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
leaseID, err := ls.createLeaseWithKeys(TTLShort)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
ls.shortLivedLeases.add(leaseID, time.Now())
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (ls *leaseStresser) createLeaseWithKeys(ttl int64) (int64, error) {
|
||||
leaseID, err := ls.createLease(ttl)
|
||||
if err != nil {
|
||||
plog.Debugf("lease creation error: (%v)", err)
|
||||
return -1, err
|
||||
}
|
||||
plog.Debugf("lease %v created ", leaseID)
|
||||
if err := ls.attachKeysWithLease(leaseID); err != nil {
|
||||
return -1, err
|
||||
}
|
||||
return leaseID, nil
|
||||
}
|
||||
|
||||
func (ls *leaseStresser) randomlyDropLeases() {
|
||||
var wg sync.WaitGroup
|
||||
for l := range ls.aliveLeases.getLeasesMap() {
|
||||
wg.Add(1)
|
||||
go func(leaseID int64) {
|
||||
defer wg.Done()
|
||||
dropped, err := ls.randomlyDropLease(leaseID)
|
||||
// if randomlyDropLease encountered an error such as context is cancelled, remove the lease from aliveLeases
|
||||
// because we can't tell whether the lease is dropped or not.
|
||||
if err != nil {
|
||||
plog.Debugf("drop lease %v has failed error (%v)", leaseID, err)
|
||||
ls.aliveLeases.remove(leaseID)
|
||||
return
|
||||
}
|
||||
if !dropped {
|
||||
return
|
||||
}
|
||||
plog.Debugf("lease %v dropped", leaseID)
|
||||
ls.revokedLeases.add(leaseID, time.Now())
|
||||
ls.aliveLeases.remove(leaseID)
|
||||
}(l)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (ls *leaseStresser) createLease(ttl int64) (int64, error) {
|
||||
resp, err := ls.lc.LeaseGrant(ls.ctx, &pb.LeaseGrantRequest{TTL: ttl})
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
return resp.ID, nil
|
||||
}
|
||||
|
||||
func (ls *leaseStresser) keepLeaseAlive(leaseID int64) {
|
||||
defer ls.aliveWg.Done()
|
||||
ctx, cancel := context.WithCancel(ls.ctx)
|
||||
stream, err := ls.lc.LeaseKeepAlive(ctx)
|
||||
defer func() { cancel() }()
|
||||
for {
|
||||
select {
|
||||
case <-time.After(500 * time.Millisecond):
|
||||
case <-ls.ctx.Done():
|
||||
plog.Debugf("keepLeaseAlive lease %v context canceled ", leaseID)
|
||||
// it is possible that lease expires at invariant checking phase but not at keepLeaseAlive() phase.
|
||||
// this scenerio is possible when alive lease is just about to expire when keepLeaseAlive() exists and expires at invariant checking phase.
|
||||
// to circumvent that scenerio, we check each lease before keepalive loop exist to see if it has been renewed in last TTL/2 duration.
|
||||
// if it is renewed, this means that invariant checking have at least ttl/2 time before lease exipres which is long enough for the checking to finish.
|
||||
// if it is not renewed, we remove the lease from the alive map so that the lease doesn't exipre during invariant checking
|
||||
renewTime, ok := ls.aliveLeases.read(leaseID)
|
||||
if ok && renewTime.Add(TTL/2*time.Second).Before(time.Now()) {
|
||||
ls.aliveLeases.remove(leaseID)
|
||||
plog.Debugf("keepLeaseAlive lease %v has not been renewed. drop it.", leaseID)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
plog.Debugf("keepLeaseAlive lease %v creates stream error: (%v)", leaseID, err)
|
||||
cancel()
|
||||
ctx, cancel = context.WithCancel(ls.ctx)
|
||||
stream, err = ls.lc.LeaseKeepAlive(ctx)
|
||||
cancel()
|
||||
continue
|
||||
}
|
||||
err = stream.Send(&pb.LeaseKeepAliveRequest{ID: leaseID})
|
||||
plog.Debugf("keepLeaseAlive stream sends lease %v keepalive request", leaseID)
|
||||
if err != nil {
|
||||
plog.Debugf("keepLeaseAlive stream sends lease %v error (%v)", leaseID, err)
|
||||
continue
|
||||
}
|
||||
leaseRenewTime := time.Now()
|
||||
plog.Debugf("keepLeaseAlive stream sends lease %v keepalive request succeed", leaseID)
|
||||
respRC, err := stream.Recv()
|
||||
if err != nil {
|
||||
plog.Debugf("keepLeaseAlive stream receives lease %v stream error (%v)", leaseID, err)
|
||||
continue
|
||||
}
|
||||
// lease expires after TTL become 0
|
||||
// don't send keepalive if the lease has expired
|
||||
if respRC.TTL <= 0 {
|
||||
plog.Debugf("keepLeaseAlive stream receives lease %v has TTL <= 0", leaseID)
|
||||
ls.aliveLeases.remove(leaseID)
|
||||
return
|
||||
}
|
||||
// renew lease timestamp only if lease is present
|
||||
plog.Debugf("keepLeaseAlive renew lease %v", leaseID)
|
||||
ls.aliveLeases.update(leaseID, leaseRenewTime)
|
||||
}
|
||||
}
|
||||
|
||||
// attachKeysWithLease function attaches keys to the lease.
|
||||
// the format of key is the concat of leaseID + '_' + '<order of key creation>'
|
||||
// e.g 5186835655248304152_0 for first created key and 5186835655248304152_1 for second created key
|
||||
func (ls *leaseStresser) attachKeysWithLease(leaseID int64) error {
|
||||
var txnPuts []*pb.RequestOp
|
||||
for j := 0; j < ls.keysPerLease; j++ {
|
||||
txnput := &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: &pb.PutRequest{Key: []byte(fmt.Sprintf("%d%s%d", leaseID, "_", j)),
|
||||
Value: []byte(fmt.Sprintf("bar")), Lease: leaseID}}}
|
||||
txnPuts = append(txnPuts, txnput)
|
||||
}
|
||||
// keep retrying until lease is not found or ctx is being canceled
|
||||
for ls.ctx.Err() == nil {
|
||||
txn := &pb.TxnRequest{Success: txnPuts}
|
||||
_, err := ls.kvc.Txn(ls.ctx, txn)
|
||||
if err == nil {
|
||||
// since all created keys will be deleted too, the number of operations on keys will be roughly 2x the number of created keys
|
||||
atomic.AddInt64(&ls.atomicModifiedKey, 2*int64(ls.keysPerLease))
|
||||
return nil
|
||||
}
|
||||
if rpctypes.Error(err) == rpctypes.ErrLeaseNotFound {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return ls.ctx.Err()
|
||||
}
|
||||
|
||||
// randomlyDropLease drops the lease only when the rand.Int(2) returns 1.
|
||||
// This creates a 50/50 percents chance of dropping a lease
|
||||
func (ls *leaseStresser) randomlyDropLease(leaseID int64) (bool, error) {
|
||||
if rand.Intn(2) != 0 {
|
||||
return false, nil
|
||||
}
|
||||
// keep retrying until a lease is dropped or ctx is being canceled
|
||||
for ls.ctx.Err() == nil {
|
||||
_, err := ls.lc.LeaseRevoke(ls.ctx, &pb.LeaseRevokeRequest{ID: leaseID})
|
||||
if err == nil || rpctypes.Error(err) == rpctypes.ErrLeaseNotFound {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
plog.Debugf("randomlyDropLease error: (%v)", ls.ctx.Err())
|
||||
return false, ls.ctx.Err()
|
||||
}
|
||||
|
||||
func (ls *leaseStresser) Pause() {
|
||||
ls.Close()
|
||||
}
|
||||
|
||||
func (ls *leaseStresser) Close() {
|
||||
plog.Debugf("lease stresser %q is closing...", ls.endpoint)
|
||||
ls.cancel()
|
||||
ls.runWg.Wait()
|
||||
ls.aliveWg.Wait()
|
||||
ls.conn.Close()
|
||||
plog.Infof("lease stresser %q is closed", ls.endpoint)
|
||||
}
|
||||
|
||||
func (ls *leaseStresser) ModifiedKeys() int64 {
|
||||
return atomic.LoadInt64(&ls.atomicModifiedKey)
|
||||
}
|
||||
|
||||
func (ls *leaseStresser) Checker() Checker { return &leaseChecker{endpoint: ls.endpoint, ls: ls} }
|
||||
@ -1,232 +0,0 @@
|
||||
// Copyright 2015 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/coreos/etcd/pkg/debugutil"
|
||||
|
||||
"github.com/coreos/pkg/capnslog"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"golang.org/x/time/rate"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
)
|
||||
|
||||
var plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "etcd-tester")
|
||||
|
||||
const (
|
||||
defaultClientPort = 2379
|
||||
defaultPeerPort = 2380
|
||||
defaultFailpointPort = 2381
|
||||
)
|
||||
|
||||
func main() {
|
||||
endpointStr := flag.String("agent-endpoints", "localhost:9027", "HTTP RPC endpoints of agents. Do not specify the schema.")
|
||||
clientPorts := flag.String("client-ports", "", "etcd client port for each agent endpoint")
|
||||
advertiseClientPorts := flag.String("advertise-client-ports", "", "etcd advertise client port for each agent endpoint")
|
||||
peerPorts := flag.String("peer-ports", "", "etcd peer port for each agent endpoint")
|
||||
advertisePeerPorts := flag.String("advertise-peer-ports", "", "etcd advertise peer port for each agent endpoint")
|
||||
failpointPorts := flag.String("failpoint-ports", "", "etcd failpoint port for each agent endpoint")
|
||||
|
||||
stressKeyLargeSize := flag.Uint("stress-key-large-size", 32*1024+1, "the size of each large key written into etcd.")
|
||||
stressKeySize := flag.Uint("stress-key-size", 100, "the size of each small key written into etcd.")
|
||||
stressKeySuffixRange := flag.Uint("stress-key-count", 250000, "the count of key range written into etcd.")
|
||||
stressKeyTxnSuffixRange := flag.Uint("stress-key-txn-count", 100, "the count of key range written into etcd txn (max 100).")
|
||||
stressKeyTxnOps := flag.Uint("stress-key-txn-ops", 1, "number of operations per a transaction (max 64).")
|
||||
limit := flag.Int("limit", -1, "the limit of rounds to run failure set (-1 to run without limits).")
|
||||
exitOnFailure := flag.Bool("exit-on-failure", false, "exit tester on first failure")
|
||||
stressQPS := flag.Int("stress-qps", 10000, "maximum number of stresser requests per second.")
|
||||
schedCases := flag.String("schedule-cases", "", "test case schedule")
|
||||
consistencyCheck := flag.Bool("consistency-check", true, "true to check consistency (revision, hash)")
|
||||
stresserType := flag.String("stresser", "keys,lease", "comma separated list of stressing clients (keys, lease, v2keys, nop, election-runner, watch-runner, lock-racer-runner, lease-runner).")
|
||||
etcdRunnerPath := flag.String("etcd-runner", "", "specify a path of etcd runner binary")
|
||||
failureTypes := flag.String("failures", "default,failpoints", "specify failures (concat of \"default\" and \"failpoints\").")
|
||||
failpoints := flag.String("failpoints", `panic("etcd-tester")`, `comma separated list of failpoint terms to inject (e.g. 'panic("etcd-tester"),1*sleep(1000)')`)
|
||||
externalFailures := flag.String("external-failures", "", "specify a path of script for enabling/disabling an external fault injector")
|
||||
enablePprof := flag.Bool("enable-pprof", false, "true to enable pprof")
|
||||
flag.Parse()
|
||||
|
||||
// to discard gRPC-side balancer logs
|
||||
grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, ioutil.Discard, ioutil.Discard))
|
||||
|
||||
eps := strings.Split(*endpointStr, ",")
|
||||
cports := portsFromArg(*clientPorts, len(eps), defaultClientPort)
|
||||
acports := portsFromArg(*advertiseClientPorts, len(eps), defaultClientPort)
|
||||
pports := portsFromArg(*peerPorts, len(eps), defaultPeerPort)
|
||||
apports := portsFromArg(*advertisePeerPorts, len(eps), defaultPeerPort)
|
||||
fports := portsFromArg(*failpointPorts, len(eps), defaultFailpointPort)
|
||||
agents := make([]agentConfig, len(eps))
|
||||
|
||||
for i := range eps {
|
||||
agents[i].endpoint = eps[i]
|
||||
agents[i].clientPort = cports[i]
|
||||
agents[i].advertiseClientPort = acports[i]
|
||||
agents[i].peerPort = pports[i]
|
||||
agents[i].advertisePeerPort = apports[i]
|
||||
agents[i].failpointPort = fports[i]
|
||||
}
|
||||
|
||||
c := &cluster{agents: agents}
|
||||
if err := c.bootstrap(); err != nil {
|
||||
plog.Fatal(err)
|
||||
}
|
||||
defer c.Terminate()
|
||||
|
||||
// ensure cluster is fully booted to know failpoints are available
|
||||
c.WaitHealth()
|
||||
|
||||
var failures []failure
|
||||
|
||||
if failureTypes != nil && *failureTypes != "" {
|
||||
types, failpoints := strings.Split(*failureTypes, ","), strings.Split(*failpoints, ",")
|
||||
failures = makeFailures(types, failpoints, c)
|
||||
}
|
||||
|
||||
if externalFailures != nil && *externalFailures != "" {
|
||||
if len(failures) != 0 {
|
||||
plog.Errorf("specify only one of -failures or -external-failures")
|
||||
os.Exit(1)
|
||||
}
|
||||
failures = append(failures, newFailureExternal(*externalFailures))
|
||||
}
|
||||
|
||||
if len(failures) == 0 {
|
||||
plog.Infof("no failures\n")
|
||||
failures = append(failures, newFailureNop())
|
||||
}
|
||||
|
||||
schedule := failures
|
||||
if schedCases != nil && *schedCases != "" {
|
||||
cases := strings.Split(*schedCases, " ")
|
||||
schedule = make([]failure, len(cases))
|
||||
for i := range cases {
|
||||
caseNum := 0
|
||||
n, err := fmt.Sscanf(cases[i], "%d", &caseNum)
|
||||
if n == 0 || err != nil {
|
||||
plog.Fatalf(`couldn't parse case "%s" (%v)`, cases[i], err)
|
||||
}
|
||||
schedule[i] = failures[caseNum]
|
||||
}
|
||||
}
|
||||
|
||||
scfg := stressConfig{
|
||||
rateLimiter: rate.NewLimiter(rate.Limit(*stressQPS), *stressQPS),
|
||||
keyLargeSize: int(*stressKeyLargeSize),
|
||||
keySize: int(*stressKeySize),
|
||||
keySuffixRange: int(*stressKeySuffixRange),
|
||||
keyTxnSuffixRange: int(*stressKeyTxnSuffixRange),
|
||||
keyTxnOps: int(*stressKeyTxnOps),
|
||||
numLeases: 10,
|
||||
keysPerLease: 10,
|
||||
|
||||
etcdRunnerPath: *etcdRunnerPath,
|
||||
}
|
||||
if scfg.keyTxnSuffixRange > 100 {
|
||||
plog.Fatalf("stress-key-txn-count is maximum 100, got %d", scfg.keyTxnSuffixRange)
|
||||
}
|
||||
if scfg.keyTxnOps > 64 {
|
||||
plog.Fatalf("stress-key-txn-ops is maximum 64, got %d", scfg.keyTxnOps)
|
||||
}
|
||||
|
||||
t := &tester{
|
||||
failures: schedule,
|
||||
cluster: c,
|
||||
limit: *limit,
|
||||
exitOnFailure: *exitOnFailure,
|
||||
|
||||
scfg: scfg,
|
||||
stresserType: *stresserType,
|
||||
doChecks: *consistencyCheck,
|
||||
}
|
||||
|
||||
sh := statusHandler{status: &t.status}
|
||||
http.Handle("/status", sh)
|
||||
http.Handle("/metrics", promhttp.Handler())
|
||||
|
||||
if *enablePprof {
|
||||
for p, h := range debugutil.PProfHandlers() {
|
||||
http.Handle(p, h)
|
||||
}
|
||||
}
|
||||
|
||||
go func() { plog.Fatal(http.ListenAndServe(":9028", nil)) }()
|
||||
|
||||
t.runLoop()
|
||||
}
|
||||
|
||||
// portsFromArg converts a comma separated list into a slice of ints
|
||||
func portsFromArg(arg string, n, defaultPort int) []int {
|
||||
ret := make([]int, n)
|
||||
if len(arg) == 0 {
|
||||
for i := range ret {
|
||||
ret[i] = defaultPort
|
||||
}
|
||||
return ret
|
||||
}
|
||||
s := strings.Split(arg, ",")
|
||||
if len(s) != n {
|
||||
fmt.Printf("expected %d ports, got %d (%s)\n", n, len(s), arg)
|
||||
os.Exit(1)
|
||||
}
|
||||
for i := range s {
|
||||
if _, err := fmt.Sscanf(s[i], "%d", &ret[i]); err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func makeFailures(types, failpoints []string, c *cluster) []failure {
|
||||
var failures []failure
|
||||
for i := range types {
|
||||
switch types[i] {
|
||||
case "default":
|
||||
defaultFailures := []failure{
|
||||
newFailureKillAll(),
|
||||
newFailureKillMajority(),
|
||||
newFailureKillOne(),
|
||||
newFailureKillLeader(),
|
||||
newFailureKillOneForLongTime(),
|
||||
newFailureKillLeaderForLongTime(),
|
||||
newFailureIsolate(),
|
||||
newFailureIsolateAll(),
|
||||
newFailureSlowNetworkOneMember(),
|
||||
newFailureSlowNetworkLeader(),
|
||||
newFailureSlowNetworkAll(),
|
||||
}
|
||||
failures = append(failures, defaultFailures...)
|
||||
|
||||
case "failpoints":
|
||||
fpFailures, fperr := failpointFailures(c, failpoints)
|
||||
if len(fpFailures) == 0 {
|
||||
plog.Infof("no failpoints found (%v)", fperr)
|
||||
}
|
||||
failures = append(failures, fpFailures...)
|
||||
|
||||
default:
|
||||
plog.Errorf("unknown failure: %s\n", types[i])
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
return failures
|
||||
}
|
||||
@ -1,190 +0,0 @@
|
||||
// Copyright 2016 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/tools/functional-tester/etcd-agent/client"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
type member struct {
|
||||
Agent client.Agent
|
||||
Endpoint string
|
||||
Name string
|
||||
ClientURL string
|
||||
AdvertiseClientURL string
|
||||
PeerURL string
|
||||
AdvertisePeerURL string
|
||||
FailpointURL string
|
||||
}
|
||||
|
||||
func (m *member) ClusterEntry() string { return m.Name + "=" + m.AdvertisePeerURL }
|
||||
|
||||
func (m *member) Flags() []string {
|
||||
return []string{
|
||||
"--name", m.Name,
|
||||
"--listen-client-urls", m.ClientURL,
|
||||
"--advertise-client-urls", m.AdvertiseClientURL,
|
||||
"--listen-peer-urls", m.PeerURL,
|
||||
"--initial-advertise-peer-urls", m.AdvertisePeerURL,
|
||||
"--initial-cluster-state", "new",
|
||||
"--snapshot-count", "10000",
|
||||
"--pre-vote",
|
||||
"--experimental-initial-corrupt-check",
|
||||
}
|
||||
}
|
||||
|
||||
func (m *member) CheckCompact(rev int64) error {
|
||||
cli, err := m.newClientV3()
|
||||
if err != nil {
|
||||
return fmt.Errorf("%v (endpoint %s)", err, m.AdvertiseClientURL)
|
||||
}
|
||||
defer cli.Close()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
wch := cli.Watch(ctx, "\x00", clientv3.WithFromKey(), clientv3.WithRev(rev-1))
|
||||
wr, ok := <-wch
|
||||
cancel()
|
||||
|
||||
if !ok {
|
||||
return fmt.Errorf("watch channel terminated (endpoint %s)", m.AdvertiseClientURL)
|
||||
}
|
||||
if wr.CompactRevision != rev {
|
||||
return fmt.Errorf("got compact revision %v, wanted %v (endpoint %s)", wr.CompactRevision, rev, m.AdvertiseClientURL)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *member) Defrag() error {
|
||||
plog.Printf("defragmenting %s", m.AdvertiseClientURL)
|
||||
cli, err := m.newClientV3()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer cli.Close()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
|
||||
_, err = cli.Defragment(ctx, m.AdvertiseClientURL)
|
||||
cancel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
plog.Printf("defragmented %s", m.AdvertiseClientURL)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *member) RevHash() (int64, int64, error) {
|
||||
conn, err := m.dialGRPC()
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
mt := pb.NewMaintenanceClient(conn)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
resp, err := mt.Hash(ctx, &pb.HashRequest{}, grpc.FailFast(false))
|
||||
cancel()
|
||||
conn.Close()
|
||||
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
|
||||
return resp.Header.Revision, int64(resp.Hash), nil
|
||||
}
|
||||
|
||||
func (m *member) Rev(ctx context.Context) (int64, error) {
|
||||
cli, err := m.newClientV3()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer cli.Close()
|
||||
resp, err := cli.Status(ctx, m.AdvertiseClientURL)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return resp.Header.Revision, nil
|
||||
}
|
||||
|
||||
func (m *member) IsLeader() (bool, error) {
|
||||
cli, err := m.newClientV3()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
defer cli.Close()
|
||||
resp, err := cli.Status(context.Background(), m.AdvertiseClientURL)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return resp.Header.MemberId == resp.Leader, nil
|
||||
}
|
||||
|
||||
func (m *member) SetHealthKeyV3() error {
|
||||
cli, err := m.newClientV3()
|
||||
if err != nil {
|
||||
return fmt.Errorf("%v (%s)", err, m.AdvertiseClientURL)
|
||||
}
|
||||
defer cli.Close()
|
||||
// give enough time-out in case expensive requests (range/delete) are pending
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
_, err = cli.Put(ctx, "health", "good")
|
||||
cancel()
|
||||
if err != nil {
|
||||
return fmt.Errorf("%v (%s)", err, m.AdvertiseClientURL)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *member) newClientV3() (*clientv3.Client, error) {
|
||||
return clientv3.New(clientv3.Config{
|
||||
Endpoints: []string{m.AdvertiseClientURL},
|
||||
DialTimeout: 5 * time.Second,
|
||||
})
|
||||
}
|
||||
|
||||
func (m *member) dialGRPC() (*grpc.ClientConn, error) {
|
||||
return grpc.Dial(m.grpcAddr(), grpc.WithInsecure(), grpc.WithTimeout(5*time.Second), grpc.WithBlock())
|
||||
}
|
||||
|
||||
// grpcAddr gets the host from clientURL so it works with grpc.Dial()
|
||||
func (m *member) grpcAddr() string {
|
||||
u, err := url.Parse(m.AdvertiseClientURL)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return u.Host
|
||||
}
|
||||
|
||||
func (m *member) peerPort() (port int) {
|
||||
u, err := url.Parse(m.AdvertisePeerURL)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
_, portStr, err := net.SplitHostPort(u.Host)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if _, err = fmt.Sscanf(portStr, "%d", &port); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return port
|
||||
}
|
||||
@ -1,64 +0,0 @@
|
||||
// Copyright 2016 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
var (
|
||||
caseTotalCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "funcational_tester",
|
||||
Name: "case_total",
|
||||
Help: "Total number of finished test cases",
|
||||
},
|
||||
[]string{"desc"},
|
||||
)
|
||||
|
||||
caseFailedTotalCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "funcational_tester",
|
||||
Name: "case_failed_total",
|
||||
Help: "Total number of failed test cases",
|
||||
},
|
||||
[]string{"desc"},
|
||||
)
|
||||
|
||||
roundTotalCounter = prometheus.NewCounter(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "funcational_tester",
|
||||
Name: "round_total",
|
||||
Help: "Total number of finished test rounds.",
|
||||
})
|
||||
|
||||
roundFailedTotalCounter = prometheus.NewCounter(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "funcational_tester",
|
||||
Name: "round_failed_total",
|
||||
Help: "Total number of failed test rounds.",
|
||||
})
|
||||
)
|
||||
|
||||
func init() {
|
||||
prometheus.MustRegister(caseTotalCounter)
|
||||
prometheus.MustRegister(caseFailedTotalCounter)
|
||||
prometheus.MustRegister(roundTotalCounter)
|
||||
prometheus.MustRegister(roundFailedTotalCounter)
|
||||
}
|
||||
@ -1,57 +0,0 @@
|
||||
// Copyright 2016 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Status struct {
|
||||
Since time.Time
|
||||
Failures []string
|
||||
RoundLimit int
|
||||
|
||||
Cluster ClusterStatus
|
||||
cluster *cluster
|
||||
|
||||
mu sync.Mutex // guards Round and Case
|
||||
Round int
|
||||
Case int
|
||||
}
|
||||
|
||||
func (s *Status) setRound(r int) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.Round = r
|
||||
}
|
||||
|
||||
func (s *Status) getRound() int {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.Round
|
||||
}
|
||||
|
||||
func (s *Status) setCase(c int) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.Case = c
|
||||
}
|
||||
|
||||
func (s *Status) getCase() int {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.Case
|
||||
}
|
||||
@ -1,218 +0,0 @@
|
||||
// Copyright 2015 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
type Stresser interface {
|
||||
// Stress starts to stress the etcd cluster
|
||||
Stress() error
|
||||
// Pause stops the stresser from sending requests to etcd. Resume by calling Stress.
|
||||
Pause()
|
||||
// Close releases all of the Stresser's resources.
|
||||
Close()
|
||||
// ModifiedKeys reports the number of keys created and deleted by stresser
|
||||
ModifiedKeys() int64
|
||||
// Checker returns an invariant checker for after the stresser is canceled.
|
||||
Checker() Checker
|
||||
}
|
||||
|
||||
// nopStresser implements Stresser that does nothing
|
||||
type nopStresser struct {
|
||||
start time.Time
|
||||
qps int
|
||||
}
|
||||
|
||||
func (s *nopStresser) Stress() error { return nil }
|
||||
func (s *nopStresser) Pause() {}
|
||||
func (s *nopStresser) Close() {}
|
||||
func (s *nopStresser) ModifiedKeys() int64 {
|
||||
return 0
|
||||
}
|
||||
func (s *nopStresser) Checker() Checker { return nil }
|
||||
|
||||
// compositeStresser implements a Stresser that runs a slice of
|
||||
// stressing clients concurrently.
|
||||
type compositeStresser struct {
|
||||
stressers []Stresser
|
||||
}
|
||||
|
||||
func (cs *compositeStresser) Stress() error {
|
||||
for i, s := range cs.stressers {
|
||||
if err := s.Stress(); err != nil {
|
||||
for j := 0; j < i; j++ {
|
||||
cs.stressers[i].Close()
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cs *compositeStresser) Pause() {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(cs.stressers))
|
||||
for i := range cs.stressers {
|
||||
go func(s Stresser) {
|
||||
defer wg.Done()
|
||||
s.Pause()
|
||||
}(cs.stressers[i])
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (cs *compositeStresser) Close() {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(cs.stressers))
|
||||
for i := range cs.stressers {
|
||||
go func(s Stresser) {
|
||||
defer wg.Done()
|
||||
s.Close()
|
||||
}(cs.stressers[i])
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (cs *compositeStresser) ModifiedKeys() (modifiedKey int64) {
|
||||
for _, stress := range cs.stressers {
|
||||
modifiedKey += stress.ModifiedKeys()
|
||||
}
|
||||
return modifiedKey
|
||||
}
|
||||
|
||||
func (cs *compositeStresser) Checker() Checker {
|
||||
var chks []Checker
|
||||
for _, s := range cs.stressers {
|
||||
if chk := s.Checker(); chk != nil {
|
||||
chks = append(chks, chk)
|
||||
}
|
||||
}
|
||||
if len(chks) == 0 {
|
||||
return nil
|
||||
}
|
||||
return newCompositeChecker(chks)
|
||||
}
|
||||
|
||||
type stressConfig struct {
|
||||
keyLargeSize int
|
||||
keySize int
|
||||
keySuffixRange int
|
||||
keyTxnSuffixRange int
|
||||
keyTxnOps int
|
||||
|
||||
numLeases int
|
||||
keysPerLease int
|
||||
|
||||
rateLimiter *rate.Limiter
|
||||
|
||||
etcdRunnerPath string
|
||||
}
|
||||
|
||||
// NewStresser creates stresser from a comma separated list of stresser types.
|
||||
func NewStresser(s string, sc *stressConfig, m *member) Stresser {
|
||||
types := strings.Split(s, ",")
|
||||
if len(types) > 1 {
|
||||
stressers := make([]Stresser, len(types))
|
||||
for i, stype := range types {
|
||||
stressers[i] = NewStresser(stype, sc, m)
|
||||
}
|
||||
return &compositeStresser{stressers}
|
||||
}
|
||||
switch s {
|
||||
case "nop":
|
||||
return &nopStresser{start: time.Now(), qps: int(sc.rateLimiter.Limit())}
|
||||
case "keys":
|
||||
// TODO: Too intensive stressing clients can panic etcd member with
|
||||
// 'out of memory' error. Put rate limits in server side.
|
||||
return &keyStresser{
|
||||
Endpoint: m.grpcAddr(),
|
||||
keyLargeSize: sc.keyLargeSize,
|
||||
keySize: sc.keySize,
|
||||
keySuffixRange: sc.keySuffixRange,
|
||||
keyTxnSuffixRange: sc.keyTxnSuffixRange,
|
||||
keyTxnOps: sc.keyTxnOps,
|
||||
N: 100,
|
||||
rateLimiter: sc.rateLimiter,
|
||||
}
|
||||
case "v2keys":
|
||||
return &v2Stresser{
|
||||
Endpoint: m.ClientURL,
|
||||
keySize: sc.keySize,
|
||||
keySuffixRange: sc.keySuffixRange,
|
||||
N: 100,
|
||||
rateLimiter: sc.rateLimiter,
|
||||
}
|
||||
case "lease":
|
||||
return &leaseStresser{
|
||||
endpoint: m.grpcAddr(),
|
||||
numLeases: sc.numLeases,
|
||||
keysPerLease: sc.keysPerLease,
|
||||
rateLimiter: sc.rateLimiter,
|
||||
}
|
||||
case "election-runner":
|
||||
reqRate := 100
|
||||
args := []string{
|
||||
"election",
|
||||
fmt.Sprintf("%v", time.Now().UnixNano()), // election name as current nano time
|
||||
"--dial-timeout=10s",
|
||||
"--endpoints", m.grpcAddr(),
|
||||
"--total-client-connections=10",
|
||||
"--rounds=0", // runs forever
|
||||
"--req-rate", fmt.Sprintf("%v", reqRate),
|
||||
}
|
||||
return newRunnerStresser(sc.etcdRunnerPath, args, sc.rateLimiter, reqRate)
|
||||
case "watch-runner":
|
||||
reqRate := 100
|
||||
args := []string{
|
||||
"watcher",
|
||||
"--prefix", fmt.Sprintf("%v", time.Now().UnixNano()), // prefix all keys with nano time
|
||||
"--total-keys=1",
|
||||
"--total-prefixes=1",
|
||||
"--watch-per-prefix=1",
|
||||
"--endpoints", m.grpcAddr(),
|
||||
"--rounds=0", // runs forever
|
||||
"--req-rate", fmt.Sprintf("%v", reqRate),
|
||||
}
|
||||
return newRunnerStresser(sc.etcdRunnerPath, args, sc.rateLimiter, reqRate)
|
||||
case "lock-racer-runner":
|
||||
reqRate := 100
|
||||
args := []string{
|
||||
"lock-racer",
|
||||
fmt.Sprintf("%v", time.Now().UnixNano()), // locker name as current nano time
|
||||
"--endpoints", m.grpcAddr(),
|
||||
"--total-client-connections=10",
|
||||
"--rounds=0", // runs forever
|
||||
"--req-rate", fmt.Sprintf("%v", reqRate),
|
||||
}
|
||||
return newRunnerStresser(sc.etcdRunnerPath, args, sc.rateLimiter, reqRate)
|
||||
case "lease-runner":
|
||||
args := []string{
|
||||
"lease-renewer",
|
||||
"--ttl=30",
|
||||
"--endpoints", m.grpcAddr(),
|
||||
}
|
||||
return newRunnerStresser(sc.etcdRunnerPath, args, sc.rateLimiter, 0)
|
||||
default:
|
||||
plog.Panicf("unknown stresser type: %s\n", s)
|
||||
}
|
||||
return nil // never reach here
|
||||
}
|
||||
@ -1,286 +0,0 @@
|
||||
// Copyright 2015 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
)
|
||||
|
||||
type tester struct {
|
||||
cluster *cluster
|
||||
limit int
|
||||
exitOnFailure bool
|
||||
|
||||
failures []failure
|
||||
status Status
|
||||
currentRevision int64
|
||||
|
||||
stresserType string
|
||||
scfg stressConfig
|
||||
doChecks bool
|
||||
|
||||
stresser Stresser
|
||||
checker Checker
|
||||
}
|
||||
|
||||
// compactQPS is rough number of compact requests per second.
|
||||
// Previous tests showed etcd can compact about 60,000 entries per second.
|
||||
const compactQPS = 50000
|
||||
|
||||
func (tt *tester) runLoop() {
|
||||
tt.status.Since = time.Now()
|
||||
tt.status.RoundLimit = tt.limit
|
||||
tt.status.cluster = tt.cluster
|
||||
for _, f := range tt.failures {
|
||||
tt.status.Failures = append(tt.status.Failures, f.Desc())
|
||||
}
|
||||
|
||||
if err := tt.resetStressCheck(); err != nil {
|
||||
plog.Errorf("%s failed to start stresser (%v)", tt.logPrefix(), err)
|
||||
tt.failed()
|
||||
return
|
||||
}
|
||||
|
||||
var preModifiedKey int64
|
||||
for round := 0; round < tt.limit || tt.limit == -1; round++ {
|
||||
tt.status.setRound(round)
|
||||
roundTotalCounter.Inc()
|
||||
|
||||
if err := tt.doRound(round); err != nil {
|
||||
plog.Warningf("%s functional-tester returning with error (%v)", tt.logPrefix(), err)
|
||||
if tt.cleanup() != nil {
|
||||
return
|
||||
}
|
||||
// reset preModifiedKey after clean up
|
||||
preModifiedKey = 0
|
||||
continue
|
||||
}
|
||||
// -1 so that logPrefix doesn't print out 'case'
|
||||
tt.status.setCase(-1)
|
||||
|
||||
revToCompact := max(0, tt.currentRevision-10000)
|
||||
currentModifiedKey := tt.stresser.ModifiedKeys()
|
||||
modifiedKey := currentModifiedKey - preModifiedKey
|
||||
preModifiedKey = currentModifiedKey
|
||||
timeout := 10 * time.Second
|
||||
timeout += time.Duration(modifiedKey/compactQPS) * time.Second
|
||||
plog.Infof("%s compacting %d modifications (timeout %v)", tt.logPrefix(), modifiedKey, timeout)
|
||||
if err := tt.compact(revToCompact, timeout); err != nil {
|
||||
plog.Warningf("%s functional-tester compact got error (%v)", tt.logPrefix(), err)
|
||||
if tt.cleanup() != nil {
|
||||
return
|
||||
}
|
||||
// reset preModifiedKey after clean up
|
||||
preModifiedKey = 0
|
||||
}
|
||||
if round > 0 && round%500 == 0 { // every 500 rounds
|
||||
if err := tt.defrag(); err != nil {
|
||||
plog.Warningf("%s functional-tester returning with error (%v)", tt.logPrefix(), err)
|
||||
tt.failed()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
plog.Infof("%s functional-tester is finished", tt.logPrefix())
|
||||
}
|
||||
|
||||
func (tt *tester) doRound(round int) error {
|
||||
for j, f := range tt.failures {
|
||||
caseTotalCounter.WithLabelValues(f.Desc()).Inc()
|
||||
tt.status.setCase(j)
|
||||
|
||||
if err := tt.cluster.WaitHealth(); err != nil {
|
||||
return fmt.Errorf("wait full health error: %v", err)
|
||||
}
|
||||
plog.Infof("%s injecting failure %q", tt.logPrefix(), f.Desc())
|
||||
if err := f.Inject(tt.cluster, round); err != nil {
|
||||
return fmt.Errorf("injection error: %v", err)
|
||||
}
|
||||
plog.Infof("%s injected failure", tt.logPrefix())
|
||||
|
||||
plog.Infof("%s recovering failure %q", tt.logPrefix(), f.Desc())
|
||||
if err := f.Recover(tt.cluster, round); err != nil {
|
||||
return fmt.Errorf("recovery error: %v", err)
|
||||
}
|
||||
plog.Infof("%s recovered failure", tt.logPrefix())
|
||||
tt.pauseStresser()
|
||||
plog.Infof("%s wait until cluster is healthy", tt.logPrefix())
|
||||
if err := tt.cluster.WaitHealth(); err != nil {
|
||||
return fmt.Errorf("wait full health error: %v", err)
|
||||
}
|
||||
plog.Infof("%s cluster is healthy", tt.logPrefix())
|
||||
|
||||
plog.Infof("%s checking consistency and invariant of cluster", tt.logPrefix())
|
||||
if err := tt.checkConsistency(); err != nil {
|
||||
return fmt.Errorf("tt.checkConsistency error (%v)", err)
|
||||
}
|
||||
plog.Infof("%s checking consistency and invariant of cluster done", tt.logPrefix())
|
||||
|
||||
plog.Infof("%s succeed!", tt.logPrefix())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (tt *tester) updateRevision() error {
|
||||
revs, _, err := tt.cluster.getRevisionHash()
|
||||
for _, rev := range revs {
|
||||
tt.currentRevision = rev
|
||||
break // just need get one of the current revisions
|
||||
}
|
||||
|
||||
plog.Infof("%s updated current revision to %d", tt.logPrefix(), tt.currentRevision)
|
||||
return err
|
||||
}
|
||||
|
||||
func (tt *tester) checkConsistency() (err error) {
|
||||
defer func() {
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if err = tt.updateRevision(); err != nil {
|
||||
plog.Warningf("%s functional-tester returning with tt.updateRevision error (%v)", tt.logPrefix(), err)
|
||||
return
|
||||
}
|
||||
err = tt.startStresser()
|
||||
}()
|
||||
if err = tt.checker.Check(); err != nil {
|
||||
plog.Infof("%s %v", tt.logPrefix(), err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (tt *tester) compact(rev int64, timeout time.Duration) (err error) {
|
||||
tt.pauseStresser()
|
||||
defer func() {
|
||||
if err == nil {
|
||||
err = tt.startStresser()
|
||||
}
|
||||
}()
|
||||
|
||||
plog.Infof("%s compacting storage (current revision %d, compact revision %d)", tt.logPrefix(), tt.currentRevision, rev)
|
||||
if err = tt.cluster.compactKV(rev, timeout); err != nil {
|
||||
return err
|
||||
}
|
||||
plog.Infof("%s compacted storage (compact revision %d)", tt.logPrefix(), rev)
|
||||
|
||||
plog.Infof("%s checking compaction (compact revision %d)", tt.logPrefix(), rev)
|
||||
if err = tt.cluster.checkCompact(rev); err != nil {
|
||||
plog.Warningf("%s checkCompact error (%v)", tt.logPrefix(), err)
|
||||
return err
|
||||
}
|
||||
|
||||
plog.Infof("%s confirmed compaction (compact revision %d)", tt.logPrefix(), rev)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (tt *tester) defrag() error {
|
||||
plog.Infof("%s defragmenting...", tt.logPrefix())
|
||||
if err := tt.cluster.defrag(); err != nil {
|
||||
plog.Warningf("%s defrag error (%v)", tt.logPrefix(), err)
|
||||
if cerr := tt.cleanup(); cerr != nil {
|
||||
return fmt.Errorf("%s, %s", err, cerr)
|
||||
}
|
||||
return err
|
||||
}
|
||||
plog.Infof("%s defragmented...", tt.logPrefix())
|
||||
return nil
|
||||
}
|
||||
|
||||
func (tt *tester) logPrefix() string {
|
||||
var (
|
||||
rd = tt.status.getRound()
|
||||
cs = tt.status.getCase()
|
||||
prefix = fmt.Sprintf("[round#%d case#%d]", rd, cs)
|
||||
)
|
||||
if cs == -1 {
|
||||
prefix = fmt.Sprintf("[round#%d]", rd)
|
||||
}
|
||||
return prefix
|
||||
}
|
||||
|
||||
func (tt *tester) failed() {
|
||||
if !tt.exitOnFailure {
|
||||
return
|
||||
}
|
||||
plog.Warningf("%s exiting on failure", tt.logPrefix())
|
||||
tt.cluster.Terminate()
|
||||
os.Exit(2)
|
||||
}
|
||||
|
||||
func (tt *tester) cleanup() error {
|
||||
defer tt.failed()
|
||||
|
||||
roundFailedTotalCounter.Inc()
|
||||
desc := "compact/defrag"
|
||||
if tt.status.Case != -1 {
|
||||
desc = tt.failures[tt.status.Case].Desc()
|
||||
}
|
||||
caseFailedTotalCounter.WithLabelValues(desc).Inc()
|
||||
|
||||
tt.closeStresser()
|
||||
if err := tt.cluster.Cleanup(); err != nil {
|
||||
plog.Warningf("%s cleanup error: %v", tt.logPrefix(), err)
|
||||
return err
|
||||
}
|
||||
if err := tt.cluster.Reset(); err != nil {
|
||||
plog.Warningf("%s cleanup Bootstrap error: %v", tt.logPrefix(), err)
|
||||
return err
|
||||
}
|
||||
return tt.resetStressCheck()
|
||||
}
|
||||
|
||||
func (tt *tester) pauseStresser() {
|
||||
plog.Infof("%s pausing the stressing clients...", tt.logPrefix())
|
||||
tt.stresser.Pause()
|
||||
plog.Infof("%s paused stressing clients", tt.logPrefix())
|
||||
}
|
||||
|
||||
func (tt *tester) startStresser() (err error) {
|
||||
plog.Infof("%s starting the stressing clients...", tt.logPrefix())
|
||||
err = tt.stresser.Stress()
|
||||
plog.Infof("%s started stressing clients", tt.logPrefix())
|
||||
return err
|
||||
}
|
||||
|
||||
func (tt *tester) closeStresser() {
|
||||
plog.Infof("%s closing the stressing clients...", tt.logPrefix())
|
||||
tt.stresser.Close()
|
||||
plog.Infof("%s closed stressing clients", tt.logPrefix())
|
||||
}
|
||||
|
||||
func (tt *tester) resetStressCheck() error {
|
||||
plog.Infof("%s resetting stressing clients and checkers...", tt.logPrefix())
|
||||
cs := &compositeStresser{}
|
||||
for _, m := range tt.cluster.Members {
|
||||
s := NewStresser(tt.stresserType, &tt.scfg, m)
|
||||
cs.stressers = append(cs.stressers, s)
|
||||
}
|
||||
tt.stresser = cs
|
||||
if !tt.doChecks {
|
||||
tt.checker = newNoChecker()
|
||||
return tt.startStresser()
|
||||
}
|
||||
chk := newHashChecker(hashAndRevGetter(tt.cluster))
|
||||
if schk := cs.Checker(); schk != nil {
|
||||
chk = newCompositeChecker([]Checker{chk, schk})
|
||||
}
|
||||
tt.checker = chk
|
||||
return tt.startStresser()
|
||||
}
|
||||
|
||||
func (tt *tester) Report() int64 { return tt.stresser.ModifiedKeys() }
|
||||
@ -1,51 +0,0 @@
|
||||
// Copyright 2016 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func getSameValue(vals map[string]int64) bool {
|
||||
var rv int64
|
||||
for _, v := range vals {
|
||||
if rv == 0 {
|
||||
rv = v
|
||||
}
|
||||
if rv != v {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func max(n1, n2 int64) int64 {
|
||||
if n1 > n2 {
|
||||
return n1
|
||||
}
|
||||
return n2
|
||||
}
|
||||
|
||||
func errsToError(errs []error) error {
|
||||
if len(errs) == 0 {
|
||||
return nil
|
||||
}
|
||||
stringArr := make([]string, len(errs))
|
||||
for i, err := range errs {
|
||||
stringArr[i] = err.Error()
|
||||
}
|
||||
return fmt.Errorf(strings.Join(stringArr, ", "))
|
||||
}
|
||||
@ -1,117 +0,0 @@
|
||||
// Copyright 2016 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net"
|
||||
"net/http"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
clientV2 "github.com/coreos/etcd/client"
|
||||
)
|
||||
|
||||
type v2Stresser struct {
|
||||
Endpoint string
|
||||
|
||||
keySize int
|
||||
keySuffixRange int
|
||||
|
||||
N int
|
||||
|
||||
rateLimiter *rate.Limiter
|
||||
|
||||
wg sync.WaitGroup
|
||||
|
||||
atomicModifiedKey int64
|
||||
|
||||
cancel func()
|
||||
}
|
||||
|
||||
func (s *v2Stresser) Stress() error {
|
||||
cfg := clientV2.Config{
|
||||
Endpoints: []string{s.Endpoint},
|
||||
Transport: &http.Transport{
|
||||
Dial: (&net.Dialer{
|
||||
Timeout: time.Second,
|
||||
KeepAlive: 30 * time.Second,
|
||||
}).Dial,
|
||||
MaxIdleConnsPerHost: s.N,
|
||||
},
|
||||
}
|
||||
c, err := clientV2.New(cfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
kv := clientV2.NewKeysAPI(c)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
s.cancel = cancel
|
||||
s.wg.Add(s.N)
|
||||
for i := 0; i < s.N; i++ {
|
||||
go func() {
|
||||
defer s.wg.Done()
|
||||
s.run(ctx, kv)
|
||||
}()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *v2Stresser) run(ctx context.Context, kv clientV2.KeysAPI) {
|
||||
for {
|
||||
if err := s.rateLimiter.Wait(ctx); err == context.Canceled {
|
||||
return
|
||||
}
|
||||
setctx, setcancel := context.WithTimeout(ctx, clientV2.DefaultRequestTimeout)
|
||||
key := fmt.Sprintf("foo%016x", rand.Intn(s.keySuffixRange))
|
||||
_, err := kv.Set(setctx, key, string(randBytes(s.keySize)), nil)
|
||||
if err == nil {
|
||||
atomic.AddInt64(&s.atomicModifiedKey, 1)
|
||||
}
|
||||
setcancel()
|
||||
if err == context.Canceled {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *v2Stresser) Pause() {
|
||||
s.cancel()
|
||||
s.wg.Wait()
|
||||
}
|
||||
|
||||
func (s *v2Stresser) Close() {
|
||||
s.Pause()
|
||||
}
|
||||
|
||||
func (s *v2Stresser) ModifiedKeys() int64 {
|
||||
return atomic.LoadInt64(&s.atomicModifiedKey)
|
||||
}
|
||||
|
||||
func (s *v2Stresser) Checker() Checker { return nil }
|
||||
|
||||
func randBytes(size int) []byte {
|
||||
data := make([]byte, size)
|
||||
for i := 0; i < size; i++ {
|
||||
data[i] = byte(int('a') + rand.Intn(26))
|
||||
}
|
||||
return data
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user