mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
functional-tester/tester: rename logger field
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
This commit is contained in:
parent
c9161b1f5c
commit
eb0c66f912
@ -39,13 +39,13 @@ type hashAndRevGetter interface {
|
||||
}
|
||||
|
||||
type hashChecker struct {
|
||||
logger *zap.Logger
|
||||
lg *zap.Logger
|
||||
hrg hashAndRevGetter
|
||||
}
|
||||
|
||||
func newHashChecker(logger *zap.Logger, hrg hashAndRevGetter) Checker {
|
||||
func newHashChecker(lg *zap.Logger, hrg hashAndRevGetter) Checker {
|
||||
return &hashChecker{
|
||||
logger: logger,
|
||||
lg: lg,
|
||||
hrg: hrg,
|
||||
}
|
||||
}
|
||||
@ -62,7 +62,7 @@ func (hc *hashChecker) checkRevAndHashes() (err error) {
|
||||
for i := 0; i < retries; i++ {
|
||||
revs, hashes, err = hc.hrg.getRevisionHash()
|
||||
if err != nil {
|
||||
hc.logger.Warn(
|
||||
hc.lg.Warn(
|
||||
"failed to get revision and hash",
|
||||
zap.Int("retries", i),
|
||||
zap.Error(err),
|
||||
@ -73,7 +73,7 @@ func (hc *hashChecker) checkRevAndHashes() (err error) {
|
||||
if sameRev && sameHashes {
|
||||
return nil
|
||||
}
|
||||
hc.logger.Warn(
|
||||
hc.lg.Warn(
|
||||
"retrying; etcd cluster is not stable",
|
||||
zap.Int("retries", i),
|
||||
zap.Bool("same-revisions", sameRev),
|
||||
@ -97,7 +97,7 @@ func (hc *hashChecker) Check() error {
|
||||
}
|
||||
|
||||
type leaseChecker struct {
|
||||
logger *zap.Logger
|
||||
lg *zap.Logger
|
||||
|
||||
endpoint string // TODO: use Member
|
||||
|
||||
@ -157,7 +157,7 @@ func (lc *leaseChecker) checkShortLivedLease(ctx context.Context, leaseID int64)
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
lc.logger.Debug(
|
||||
lc.lg.Debug(
|
||||
"retrying; Lease TimeToLive failed",
|
||||
zap.Int("retries", i),
|
||||
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
|
||||
@ -167,7 +167,7 @@ func (lc *leaseChecker) checkShortLivedLease(ctx context.Context, leaseID int64)
|
||||
}
|
||||
if resp.TTL > 0 {
|
||||
dur := time.Duration(resp.TTL) * time.Second
|
||||
lc.logger.Debug(
|
||||
lc.lg.Debug(
|
||||
"lease has not been expired, wait until expire",
|
||||
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
|
||||
zap.Int64("ttl", resp.TTL),
|
||||
@ -175,7 +175,7 @@ func (lc *leaseChecker) checkShortLivedLease(ctx context.Context, leaseID int64)
|
||||
)
|
||||
time.Sleep(dur)
|
||||
} else {
|
||||
lc.logger.Debug(
|
||||
lc.lg.Debug(
|
||||
"lease expired but not yet revoked",
|
||||
zap.Int("retries", i),
|
||||
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
|
||||
@ -195,7 +195,7 @@ func (lc *leaseChecker) checkShortLivedLease(ctx context.Context, leaseID int64)
|
||||
func (lc *leaseChecker) checkLease(ctx context.Context, expired bool, leaseID int64) error {
|
||||
keysExpired, err := lc.hasKeysAttachedToLeaseExpired(ctx, leaseID)
|
||||
if err != nil {
|
||||
lc.logger.Warn(
|
||||
lc.lg.Warn(
|
||||
"hasKeysAttachedToLeaseExpired failed",
|
||||
zap.String("endpoint", lc.endpoint),
|
||||
zap.Error(err),
|
||||
@ -204,7 +204,7 @@ func (lc *leaseChecker) checkLease(ctx context.Context, expired bool, leaseID in
|
||||
}
|
||||
leaseExpired, err := lc.hasLeaseExpired(ctx, leaseID)
|
||||
if err != nil {
|
||||
lc.logger.Warn(
|
||||
lc.lg.Warn(
|
||||
"hasLeaseExpired failed",
|
||||
zap.String("endpoint", lc.endpoint),
|
||||
zap.Error(err),
|
||||
@ -248,7 +248,7 @@ func (lc *leaseChecker) hasLeaseExpired(ctx context.Context, leaseID int64) (boo
|
||||
} else {
|
||||
return resp.TTL == -1, nil
|
||||
}
|
||||
lc.logger.Warn(
|
||||
lc.lg.Warn(
|
||||
"hasLeaseExpired getLeaseByID failed",
|
||||
zap.String("endpoint", lc.endpoint),
|
||||
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
|
||||
@ -267,7 +267,7 @@ func (lc *leaseChecker) hasKeysAttachedToLeaseExpired(ctx context.Context, lease
|
||||
RangeEnd: []byte(clientv3.GetPrefixRangeEnd(fmt.Sprintf("%d", leaseID))),
|
||||
}, grpc.FailFast(false))
|
||||
if err != nil {
|
||||
lc.logger.Warn(
|
||||
lc.lg.Warn(
|
||||
"hasKeysAttachedToLeaseExpired failed",
|
||||
zap.String("endpoint", lc.endpoint),
|
||||
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
|
||||
|
@ -38,7 +38,7 @@ import (
|
||||
|
||||
// Cluster defines tester cluster.
|
||||
type Cluster struct {
|
||||
logger *zap.Logger
|
||||
lg *zap.Logger
|
||||
|
||||
agentConns []*grpc.ClientConn
|
||||
agentClients []rpcpb.TransportClient
|
||||
@ -61,15 +61,15 @@ type Cluster struct {
|
||||
cs int
|
||||
}
|
||||
|
||||
func newCluster(logger *zap.Logger, fpath string) (*Cluster, error) {
|
||||
logger.Info("reading configuration file", zap.String("path", fpath))
|
||||
func newCluster(lg *zap.Logger, fpath string) (*Cluster, error) {
|
||||
lg.Info("reading configuration file", zap.String("path", fpath))
|
||||
bts, err := ioutil.ReadFile(fpath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
logger.Info("opened configuration file", zap.String("path", fpath))
|
||||
lg.Info("opened configuration file", zap.String("path", fpath))
|
||||
|
||||
clus := &Cluster{logger: logger}
|
||||
clus := &Cluster{lg: lg}
|
||||
if err = yaml.Unmarshal(bts, clus); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -192,8 +192,8 @@ var dialOpts = []grpc.DialOption{
|
||||
}
|
||||
|
||||
// NewCluster creates a client from a tester configuration.
|
||||
func NewCluster(logger *zap.Logger, fpath string) (*Cluster, error) {
|
||||
clus, err := newCluster(logger, fpath)
|
||||
func NewCluster(lg *zap.Logger, fpath string) (*Cluster, error) {
|
||||
clus, err := newCluster(lg, fpath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -205,21 +205,21 @@ func NewCluster(logger *zap.Logger, fpath string) (*Cluster, error) {
|
||||
clus.failures = make([]Failure, 0)
|
||||
|
||||
for i, ap := range clus.Members {
|
||||
logger.Info("connecting", zap.String("agent-address", ap.AgentAddr))
|
||||
clus.lg.Info("connecting", zap.String("agent-address", ap.AgentAddr))
|
||||
var err error
|
||||
clus.agentConns[i], err = grpc.Dial(ap.AgentAddr, dialOpts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
clus.agentClients[i] = rpcpb.NewTransportClient(clus.agentConns[i])
|
||||
logger.Info("connected", zap.String("agent-address", ap.AgentAddr))
|
||||
clus.lg.Info("connected", zap.String("agent-address", ap.AgentAddr))
|
||||
|
||||
logger.Info("creating stream", zap.String("agent-address", ap.AgentAddr))
|
||||
clus.lg.Info("creating stream", zap.String("agent-address", ap.AgentAddr))
|
||||
clus.agentStreams[i], err = clus.agentClients[i].Transport(context.Background())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
logger.Info("created stream", zap.String("agent-address", ap.AgentAddr))
|
||||
clus.lg.Info("created stream", zap.String("agent-address", ap.AgentAddr))
|
||||
}
|
||||
|
||||
mux := http.NewServeMux()
|
||||
@ -246,18 +246,18 @@ func NewCluster(logger *zap.Logger, fpath string) (*Cluster, error) {
|
||||
}
|
||||
|
||||
func (clus *Cluster) serveTesterServer() {
|
||||
clus.logger.Info(
|
||||
clus.lg.Info(
|
||||
"started tester HTTP server",
|
||||
zap.String("tester-address", clus.Tester.TesterAddr),
|
||||
)
|
||||
err := clus.testerHTTPServer.ListenAndServe()
|
||||
clus.logger.Info(
|
||||
clus.lg.Info(
|
||||
"tester HTTP server returned",
|
||||
zap.String("tester-address", clus.Tester.TesterAddr),
|
||||
zap.Error(err),
|
||||
)
|
||||
if err != nil && err != http.ErrServerClosed {
|
||||
clus.logger.Fatal("tester HTTP errored", zap.Error(err))
|
||||
clus.lg.Fatal("tester HTTP errored", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
@ -291,7 +291,7 @@ func (clus *Cluster) updateFailures() {
|
||||
case "FAILPOINTS":
|
||||
fpFailures, fperr := failpointFailures(clus)
|
||||
if len(fpFailures) == 0 {
|
||||
clus.logger.Info("no failpoints found!", zap.Error(fperr))
|
||||
clus.lg.Info("no failpoints found!", zap.Error(fperr))
|
||||
}
|
||||
clus.failures = append(clus.failures, fpFailures...)
|
||||
case "NO_FAIL":
|
||||
@ -316,13 +316,13 @@ func (clus *Cluster) shuffleFailures() {
|
||||
n := len(clus.failures)
|
||||
cp := coprime(n)
|
||||
|
||||
clus.logger.Info("shuffling test failure cases", zap.Int("total", n))
|
||||
clus.lg.Info("shuffling test failure cases", zap.Int("total", n))
|
||||
fs := make([]Failure, n)
|
||||
for i := 0; i < n; i++ {
|
||||
fs[i] = clus.failures[(cp*i+offset)%n]
|
||||
}
|
||||
clus.failures = fs
|
||||
clus.logger.Info("shuffled test failure cases", zap.Int("total", n))
|
||||
clus.lg.Info("shuffled test failure cases", zap.Int("total", n))
|
||||
}
|
||||
|
||||
/*
|
||||
@ -354,7 +354,7 @@ func gcd(x, y int) int {
|
||||
}
|
||||
|
||||
func (clus *Cluster) updateStresserChecker() {
|
||||
clus.logger.Info(
|
||||
clus.lg.Info(
|
||||
"updating stressers",
|
||||
zap.Int("round", clus.rd),
|
||||
zap.Int("case", clus.cs),
|
||||
@ -367,7 +367,7 @@ func (clus *Cluster) updateStresserChecker() {
|
||||
clus.stresser = cs
|
||||
|
||||
if clus.Tester.ConsistencyCheck {
|
||||
clus.checker = newHashChecker(clus.logger, hashAndRevGetter(clus))
|
||||
clus.checker = newHashChecker(clus.lg, hashAndRevGetter(clus))
|
||||
if schk := cs.Checker(); schk != nil {
|
||||
clus.checker = newCompositeChecker([]Checker{clus.checker, schk})
|
||||
}
|
||||
@ -375,7 +375,7 @@ func (clus *Cluster) updateStresserChecker() {
|
||||
clus.checker = newNoChecker()
|
||||
}
|
||||
|
||||
clus.logger.Info(
|
||||
clus.lg.Info(
|
||||
"updated stressers",
|
||||
zap.Int("round", clus.rd),
|
||||
zap.Int("case", clus.cs),
|
||||
@ -383,13 +383,13 @@ func (clus *Cluster) updateStresserChecker() {
|
||||
}
|
||||
|
||||
func (clus *Cluster) startStresser() (err error) {
|
||||
clus.logger.Info(
|
||||
clus.lg.Info(
|
||||
"starting stressers",
|
||||
zap.Int("round", clus.rd),
|
||||
zap.Int("case", clus.cs),
|
||||
)
|
||||
err = clus.stresser.Stress()
|
||||
clus.logger.Info(
|
||||
clus.lg.Info(
|
||||
"started stressers",
|
||||
zap.Int("round", clus.rd),
|
||||
zap.Int("case", clus.cs),
|
||||
@ -398,13 +398,13 @@ func (clus *Cluster) startStresser() (err error) {
|
||||
}
|
||||
|
||||
func (clus *Cluster) closeStresser() {
|
||||
clus.logger.Info(
|
||||
clus.lg.Info(
|
||||
"closing stressers",
|
||||
zap.Int("round", clus.rd),
|
||||
zap.Int("case", clus.cs),
|
||||
)
|
||||
clus.stresser.Close()
|
||||
clus.logger.Info(
|
||||
clus.lg.Info(
|
||||
"closed stressers",
|
||||
zap.Int("round", clus.rd),
|
||||
zap.Int("case", clus.cs),
|
||||
@ -412,13 +412,13 @@ func (clus *Cluster) closeStresser() {
|
||||
}
|
||||
|
||||
func (clus *Cluster) pauseStresser() {
|
||||
clus.logger.Info(
|
||||
clus.lg.Info(
|
||||
"pausing stressers",
|
||||
zap.Int("round", clus.rd),
|
||||
zap.Int("case", clus.cs),
|
||||
)
|
||||
clus.stresser.Pause()
|
||||
clus.logger.Info(
|
||||
clus.lg.Info(
|
||||
"paused stressers",
|
||||
zap.Int("round", clus.rd),
|
||||
zap.Int("case", clus.cs),
|
||||
@ -431,7 +431,7 @@ func (clus *Cluster) checkConsistency() (err error) {
|
||||
return
|
||||
}
|
||||
if err = clus.updateRevision(); err != nil {
|
||||
clus.logger.Warn(
|
||||
clus.lg.Warn(
|
||||
"updateRevision failed",
|
||||
zap.Error(err),
|
||||
)
|
||||
@ -440,20 +440,20 @@ func (clus *Cluster) checkConsistency() (err error) {
|
||||
err = clus.startStresser()
|
||||
}()
|
||||
|
||||
clus.logger.Info(
|
||||
clus.lg.Info(
|
||||
"checking consistency and invariant of cluster",
|
||||
zap.Int("round", clus.rd),
|
||||
zap.Int("case", clus.cs),
|
||||
zap.String("desc", clus.failures[clus.cs].Desc()),
|
||||
)
|
||||
if err = clus.checker.Check(); err != nil {
|
||||
clus.logger.Warn(
|
||||
clus.lg.Warn(
|
||||
"checker.Check failed",
|
||||
zap.Error(err),
|
||||
)
|
||||
return err
|
||||
}
|
||||
clus.logger.Info(
|
||||
clus.lg.Info(
|
||||
"checked consistency and invariant of cluster",
|
||||
zap.Int("round", clus.rd),
|
||||
zap.Int("case", clus.cs),
|
||||
@ -488,7 +488,7 @@ func (clus *Cluster) broadcastOperation(op rpcpb.Operation) error {
|
||||
strings.Contains(err.Error(), "rpc error: code = Unavailable desc = transport is closing") {
|
||||
// agent server has already closed;
|
||||
// so this error is expected
|
||||
clus.logger.Info(
|
||||
clus.lg.Info(
|
||||
"successfully destroyed",
|
||||
zap.String("member", clus.Members[i].EtcdClientEndpoint),
|
||||
)
|
||||
@ -511,13 +511,13 @@ func (clus *Cluster) sendOperation(idx int, op rpcpb.Operation) error {
|
||||
clus.agentRequests[idx].Operation = op
|
||||
}
|
||||
|
||||
clus.logger.Info(
|
||||
clus.lg.Info(
|
||||
"sending request",
|
||||
zap.String("operation", op.String()),
|
||||
zap.String("to", clus.Members[idx].EtcdClientEndpoint),
|
||||
)
|
||||
err := clus.agentStreams[idx].Send(clus.agentRequests[idx])
|
||||
clus.logger.Info(
|
||||
clus.lg.Info(
|
||||
"sent request",
|
||||
zap.String("operation", op.String()),
|
||||
zap.String("to", clus.Members[idx].EtcdClientEndpoint),
|
||||
@ -527,14 +527,14 @@ func (clus *Cluster) sendOperation(idx int, op rpcpb.Operation) error {
|
||||
return err
|
||||
}
|
||||
|
||||
clus.logger.Info(
|
||||
clus.lg.Info(
|
||||
"receiving response",
|
||||
zap.String("operation", op.String()),
|
||||
zap.String("from", clus.Members[idx].EtcdClientEndpoint),
|
||||
)
|
||||
resp, err := clus.agentStreams[idx].Recv()
|
||||
if resp != nil {
|
||||
clus.logger.Info(
|
||||
clus.lg.Info(
|
||||
"received response",
|
||||
zap.String("operation", op.String()),
|
||||
zap.String("from", clus.Members[idx].EtcdClientEndpoint),
|
||||
@ -543,7 +543,7 @@ func (clus *Cluster) sendOperation(idx int, op rpcpb.Operation) error {
|
||||
zap.Error(err),
|
||||
)
|
||||
} else {
|
||||
clus.logger.Info(
|
||||
clus.lg.Info(
|
||||
"received empty response",
|
||||
zap.String("operation", op.String()),
|
||||
zap.String("from", clus.Members[idx].EtcdClientEndpoint),
|
||||
@ -562,26 +562,26 @@ func (clus *Cluster) sendOperation(idx int, op rpcpb.Operation) error {
|
||||
|
||||
// DestroyEtcdAgents terminates all tester connections to agents and etcd servers.
|
||||
func (clus *Cluster) DestroyEtcdAgents() {
|
||||
clus.logger.Info("destroying etcd servers and agents")
|
||||
clus.lg.Info("destroying etcd servers and agents")
|
||||
err := clus.broadcastOperation(rpcpb.Operation_DestroyEtcdAgent)
|
||||
if err != nil {
|
||||
clus.logger.Warn("failed to destroy etcd servers and agents", zap.Error(err))
|
||||
clus.lg.Warn("failed to destroy etcd servers and agents", zap.Error(err))
|
||||
} else {
|
||||
clus.logger.Info("destroyed etcd servers and agents")
|
||||
clus.lg.Info("destroyed etcd servers and agents")
|
||||
}
|
||||
|
||||
for i, conn := range clus.agentConns {
|
||||
clus.logger.Info("closing connection to agent", zap.String("agent-address", clus.Members[i].AgentAddr))
|
||||
clus.lg.Info("closing connection to agent", zap.String("agent-address", clus.Members[i].AgentAddr))
|
||||
err := conn.Close()
|
||||
clus.logger.Info("closed connection to agent", zap.String("agent-address", clus.Members[i].AgentAddr), zap.Error(err))
|
||||
clus.lg.Info("closed connection to agent", zap.String("agent-address", clus.Members[i].AgentAddr), zap.Error(err))
|
||||
}
|
||||
|
||||
if clus.testerHTTPServer != nil {
|
||||
clus.logger.Info("closing tester HTTP server", zap.String("tester-address", clus.Tester.TesterAddr))
|
||||
clus.lg.Info("closing tester HTTP server", zap.String("tester-address", clus.Tester.TesterAddr))
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
err := clus.testerHTTPServer.Shutdown(ctx)
|
||||
cancel()
|
||||
clus.logger.Info("closed tester HTTP server", zap.String("tester-address", clus.Tester.TesterAddr), zap.Error(err))
|
||||
clus.lg.Info("closed tester HTTP server", zap.String("tester-address", clus.Tester.TesterAddr), zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
@ -595,13 +595,13 @@ func (clus *Cluster) WaitHealth() error {
|
||||
// reasonable workload (https://github.com/coreos/etcd/issues/2698)
|
||||
for i := 0; i < 60; i++ {
|
||||
for _, m := range clus.Members {
|
||||
clus.logger.Info(
|
||||
clus.lg.Info(
|
||||
"writing health key",
|
||||
zap.Int("retries", i),
|
||||
zap.String("endpoint", m.EtcdClientEndpoint),
|
||||
)
|
||||
if err = m.WriteHealthKey(); err != nil {
|
||||
clus.logger.Warn(
|
||||
clus.lg.Warn(
|
||||
"writing health key failed",
|
||||
zap.Int("retries", i),
|
||||
zap.String("endpoint", m.EtcdClientEndpoint),
|
||||
@ -609,14 +609,14 @@ func (clus *Cluster) WaitHealth() error {
|
||||
)
|
||||
break
|
||||
}
|
||||
clus.logger.Info(
|
||||
clus.lg.Info(
|
||||
"wrote health key",
|
||||
zap.Int("retries", i),
|
||||
zap.String("endpoint", m.EtcdClientEndpoint),
|
||||
)
|
||||
}
|
||||
if err == nil {
|
||||
clus.logger.Info(
|
||||
clus.lg.Info(
|
||||
"writing health key success on all members",
|
||||
zap.Int("retries", i),
|
||||
)
|
||||
@ -683,7 +683,7 @@ func (clus *Cluster) compactKV(rev int64, timeout time.Duration) (err error) {
|
||||
for i, m := range clus.Members {
|
||||
conn, derr := m.DialEtcdGRPCServer()
|
||||
if derr != nil {
|
||||
clus.logger.Warn(
|
||||
clus.lg.Warn(
|
||||
"compactKV dial failed",
|
||||
zap.String("endpoint", m.EtcdClientEndpoint),
|
||||
zap.Error(derr),
|
||||
@ -693,7 +693,7 @@ func (clus *Cluster) compactKV(rev int64, timeout time.Duration) (err error) {
|
||||
}
|
||||
kvc := pb.NewKVClient(conn)
|
||||
|
||||
clus.logger.Info(
|
||||
clus.lg.Info(
|
||||
"compacting",
|
||||
zap.String("endpoint", m.EtcdClientEndpoint),
|
||||
zap.Int64("compact-revision", rev),
|
||||
@ -709,14 +709,14 @@ func (clus *Cluster) compactKV(rev int64, timeout time.Duration) (err error) {
|
||||
succeed := true
|
||||
if cerr != nil {
|
||||
if strings.Contains(cerr.Error(), "required revision has been compacted") && i > 0 {
|
||||
clus.logger.Info(
|
||||
clus.lg.Info(
|
||||
"compact error is ignored",
|
||||
zap.String("endpoint", m.EtcdClientEndpoint),
|
||||
zap.Int64("compact-revision", rev),
|
||||
zap.Error(cerr),
|
||||
)
|
||||
} else {
|
||||
clus.logger.Warn(
|
||||
clus.lg.Warn(
|
||||
"compact failed",
|
||||
zap.String("endpoint", m.EtcdClientEndpoint),
|
||||
zap.Int64("compact-revision", rev),
|
||||
@ -728,7 +728,7 @@ func (clus *Cluster) compactKV(rev int64, timeout time.Duration) (err error) {
|
||||
}
|
||||
|
||||
if succeed {
|
||||
clus.logger.Info(
|
||||
clus.lg.Info(
|
||||
"compacted",
|
||||
zap.String("endpoint", m.EtcdClientEndpoint),
|
||||
zap.Int64("compact-revision", rev),
|
||||
@ -753,14 +753,14 @@ func (clus *Cluster) checkCompact(rev int64) error {
|
||||
}
|
||||
|
||||
func (clus *Cluster) defrag() error {
|
||||
clus.logger.Info(
|
||||
clus.lg.Info(
|
||||
"defragmenting",
|
||||
zap.Int("round", clus.rd),
|
||||
zap.Int("case", clus.cs),
|
||||
)
|
||||
for _, m := range clus.Members {
|
||||
if err := m.Defrag(); err != nil {
|
||||
clus.logger.Warn(
|
||||
clus.lg.Warn(
|
||||
"defrag failed",
|
||||
zap.Int("round", clus.rd),
|
||||
zap.Int("case", clus.cs),
|
||||
@ -769,7 +769,7 @@ func (clus *Cluster) defrag() error {
|
||||
return err
|
||||
}
|
||||
}
|
||||
clus.logger.Info(
|
||||
clus.lg.Info(
|
||||
"defragmented",
|
||||
zap.Int("round", clus.rd),
|
||||
zap.Int("case", clus.cs),
|
||||
|
@ -37,7 +37,7 @@ func (clus *Cluster) StartTester() {
|
||||
clus.rd = round
|
||||
|
||||
if err := clus.doRound(); err != nil {
|
||||
clus.logger.Warn(
|
||||
clus.lg.Warn(
|
||||
"doRound failed; returning",
|
||||
zap.Int("round", clus.rd),
|
||||
zap.Int("case", clus.cs),
|
||||
@ -60,21 +60,21 @@ func (clus *Cluster) StartTester() {
|
||||
preModifiedKey = currentModifiedKey
|
||||
timeout := 10 * time.Second
|
||||
timeout += time.Duration(modifiedKey/compactQPS) * time.Second
|
||||
clus.logger.Info(
|
||||
clus.lg.Info(
|
||||
"compacting",
|
||||
zap.Int("round", clus.rd),
|
||||
zap.Int("case", clus.cs),
|
||||
zap.Duration("timeout", timeout),
|
||||
)
|
||||
if err := clus.compact(revToCompact, timeout); err != nil {
|
||||
clus.logger.Warn(
|
||||
clus.lg.Warn(
|
||||
"compact failed",
|
||||
zap.Int("round", clus.rd),
|
||||
zap.Int("case", clus.cs),
|
||||
zap.Error(err),
|
||||
)
|
||||
if err = clus.cleanup(); err != nil {
|
||||
clus.logger.Warn(
|
||||
clus.lg.Warn(
|
||||
"cleanup failed",
|
||||
zap.Int("round", clus.rd),
|
||||
zap.Int("case", clus.cs),
|
||||
@ -87,7 +87,7 @@ func (clus *Cluster) StartTester() {
|
||||
}
|
||||
if round > 0 && round%500 == 0 { // every 500 rounds
|
||||
if err := clus.defrag(); err != nil {
|
||||
clus.logger.Warn(
|
||||
clus.lg.Warn(
|
||||
"defrag failed; returning",
|
||||
zap.Int("round", clus.rd),
|
||||
zap.Int("case", clus.cs),
|
||||
@ -99,7 +99,7 @@ func (clus *Cluster) StartTester() {
|
||||
}
|
||||
}
|
||||
|
||||
clus.logger.Info(
|
||||
clus.lg.Info(
|
||||
"functional-tester passed",
|
||||
zap.Int("round", clus.rd),
|
||||
zap.Int("case", clus.cs),
|
||||
@ -111,7 +111,7 @@ func (clus *Cluster) doRound() error {
|
||||
clus.shuffleFailures()
|
||||
}
|
||||
|
||||
clus.logger.Info(
|
||||
clus.lg.Info(
|
||||
"starting round",
|
||||
zap.Int("round", clus.rd),
|
||||
zap.Strings("failures", clus.failureStrings()),
|
||||
@ -121,12 +121,12 @@ func (clus *Cluster) doRound() error {
|
||||
|
||||
caseTotalCounter.WithLabelValues(f.Desc()).Inc()
|
||||
|
||||
clus.logger.Info("wait health before injecting failures")
|
||||
clus.lg.Info("wait health before injecting failures")
|
||||
if err := clus.WaitHealth(); err != nil {
|
||||
return fmt.Errorf("wait full health error: %v", err)
|
||||
}
|
||||
|
||||
clus.logger.Info(
|
||||
clus.lg.Info(
|
||||
"injecting failure",
|
||||
zap.Int("round", clus.rd),
|
||||
zap.Int("case", clus.cs),
|
||||
@ -135,7 +135,7 @@ func (clus *Cluster) doRound() error {
|
||||
if err := f.Inject(clus); err != nil {
|
||||
return fmt.Errorf("injection error: %v", err)
|
||||
}
|
||||
clus.logger.Info(
|
||||
clus.lg.Info(
|
||||
"injected failure",
|
||||
zap.Int("round", clus.rd),
|
||||
zap.Int("case", clus.cs),
|
||||
@ -145,7 +145,7 @@ func (clus *Cluster) doRound() error {
|
||||
// if run local, recovering server may conflict
|
||||
// with stressing client ports
|
||||
// TODO: use unix for local tests
|
||||
clus.logger.Info(
|
||||
clus.lg.Info(
|
||||
"recovering failure",
|
||||
zap.Int("round", clus.rd),
|
||||
zap.Int("case", clus.cs),
|
||||
@ -154,26 +154,26 @@ func (clus *Cluster) doRound() error {
|
||||
if err := f.Recover(clus); err != nil {
|
||||
return fmt.Errorf("recovery error: %v", err)
|
||||
}
|
||||
clus.logger.Info(
|
||||
clus.lg.Info(
|
||||
"recovered failure",
|
||||
zap.Int("round", clus.rd),
|
||||
zap.Int("case", clus.cs),
|
||||
zap.String("desc", f.Desc()),
|
||||
)
|
||||
|
||||
clus.logger.Info("pausing stresser after failure recovery, before wait health")
|
||||
clus.lg.Info("pausing stresser after failure recovery, before wait health")
|
||||
clus.pauseStresser()
|
||||
|
||||
clus.logger.Info("wait health after recovering failures")
|
||||
clus.lg.Info("wait health after recovering failures")
|
||||
if err := clus.WaitHealth(); err != nil {
|
||||
return fmt.Errorf("wait full health error: %v", err)
|
||||
}
|
||||
clus.logger.Info("check consistency after recovering failures")
|
||||
clus.lg.Info("check consistency after recovering failures")
|
||||
if err := clus.checkConsistency(); err != nil {
|
||||
return fmt.Errorf("tt.checkConsistency error (%v)", err)
|
||||
}
|
||||
|
||||
clus.logger.Info(
|
||||
clus.lg.Info(
|
||||
"failure case passed",
|
||||
zap.Int("round", clus.rd),
|
||||
zap.Int("case", clus.cs),
|
||||
@ -181,7 +181,7 @@ func (clus *Cluster) doRound() error {
|
||||
)
|
||||
}
|
||||
|
||||
clus.logger.Info(
|
||||
clus.lg.Info(
|
||||
"finished round",
|
||||
zap.Int("round", clus.rd),
|
||||
zap.Strings("failures", clus.failureStrings()),
|
||||
@ -196,7 +196,7 @@ func (clus *Cluster) updateRevision() error {
|
||||
break // just need get one of the current revisions
|
||||
}
|
||||
|
||||
clus.logger.Info(
|
||||
clus.lg.Info(
|
||||
"updated current revision",
|
||||
zap.Int64("current-revision", clus.currentRevision),
|
||||
)
|
||||
@ -204,7 +204,7 @@ func (clus *Cluster) updateRevision() error {
|
||||
}
|
||||
|
||||
func (clus *Cluster) compact(rev int64, timeout time.Duration) (err error) {
|
||||
clus.logger.Info("pausing stresser before compact")
|
||||
clus.lg.Info("pausing stresser before compact")
|
||||
clus.pauseStresser()
|
||||
defer func() {
|
||||
if err == nil {
|
||||
@ -212,7 +212,7 @@ func (clus *Cluster) compact(rev int64, timeout time.Duration) (err error) {
|
||||
}
|
||||
}()
|
||||
|
||||
clus.logger.Info(
|
||||
clus.lg.Info(
|
||||
"compacting storage",
|
||||
zap.Int64("current-revision", clus.currentRevision),
|
||||
zap.Int64("compact-revision", rev),
|
||||
@ -220,19 +220,19 @@ func (clus *Cluster) compact(rev int64, timeout time.Duration) (err error) {
|
||||
if err = clus.compactKV(rev, timeout); err != nil {
|
||||
return err
|
||||
}
|
||||
clus.logger.Info(
|
||||
clus.lg.Info(
|
||||
"compacted storage",
|
||||
zap.Int64("current-revision", clus.currentRevision),
|
||||
zap.Int64("compact-revision", rev),
|
||||
)
|
||||
|
||||
clus.logger.Info(
|
||||
clus.lg.Info(
|
||||
"checking compaction",
|
||||
zap.Int64("current-revision", clus.currentRevision),
|
||||
zap.Int64("compact-revision", rev),
|
||||
)
|
||||
if err = clus.checkCompact(rev); err != nil {
|
||||
clus.logger.Warn(
|
||||
clus.lg.Warn(
|
||||
"checkCompact failed",
|
||||
zap.Int64("current-revision", clus.currentRevision),
|
||||
zap.Int64("compact-revision", rev),
|
||||
@ -240,7 +240,7 @@ func (clus *Cluster) compact(rev int64, timeout time.Duration) (err error) {
|
||||
)
|
||||
return err
|
||||
}
|
||||
clus.logger.Info(
|
||||
clus.lg.Info(
|
||||
"confirmed compaction",
|
||||
zap.Int64("current-revision", clus.currentRevision),
|
||||
zap.Int64("compact-revision", rev),
|
||||
@ -254,7 +254,7 @@ func (clus *Cluster) failed() {
|
||||
return
|
||||
}
|
||||
|
||||
clus.logger.Info(
|
||||
clus.lg.Info(
|
||||
"exiting on failure",
|
||||
zap.Int("round", clus.rd),
|
||||
zap.Int("case", clus.cs),
|
||||
@ -275,7 +275,7 @@ func (clus *Cluster) cleanup() error {
|
||||
|
||||
clus.closeStresser()
|
||||
if err := clus.FailArchive(); err != nil {
|
||||
clus.logger.Warn(
|
||||
clus.lg.Warn(
|
||||
"cleanup failed",
|
||||
zap.Int("round", clus.rd),
|
||||
zap.Int("case", clus.cs),
|
||||
@ -284,7 +284,7 @@ func (clus *Cluster) cleanup() error {
|
||||
return err
|
||||
}
|
||||
if err := clus.Restart(); err != nil {
|
||||
clus.logger.Warn(
|
||||
clus.lg.Warn(
|
||||
"restart failed",
|
||||
zap.Int("round", clus.rd),
|
||||
zap.Int("case", clus.cs),
|
||||
|
@ -30,7 +30,7 @@ func (f *failureDelay) Inject(clus *Cluster) error {
|
||||
return err
|
||||
}
|
||||
if f.delayDuration > 0 {
|
||||
clus.logger.Info(
|
||||
clus.lg.Info(
|
||||
"sleeping in failureDelay",
|
||||
zap.Duration("delay", f.delayDuration),
|
||||
zap.String("case", f.Failure.Desc()),
|
||||
|
@ -21,6 +21,7 @@ import (
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Stresser defines stressing client operations.
|
||||
type Stresser interface {
|
||||
// Stress starts to stress the etcd cluster
|
||||
Stress() error
|
||||
@ -38,7 +39,7 @@ type Stresser interface {
|
||||
func newStresser(clus *Cluster, idx int) Stresser {
|
||||
stressers := make([]Stresser, len(clus.Tester.StressTypes))
|
||||
for i, stype := range clus.Tester.StressTypes {
|
||||
clus.logger.Info("creating stresser", zap.String("type", stype))
|
||||
clus.lg.Info("creating stresser", zap.String("type", stype))
|
||||
|
||||
switch stype {
|
||||
case "NO_STRESS":
|
||||
@ -48,7 +49,7 @@ func newStresser(clus *Cluster, idx int) Stresser {
|
||||
// TODO: Too intensive stressing clients can panic etcd member with
|
||||
// 'out of memory' error. Put rate limits in server side.
|
||||
stressers[i] = &keyStresser{
|
||||
logger: clus.logger,
|
||||
lg: clus.lg,
|
||||
Endpoint: clus.Members[idx].EtcdClientEndpoint,
|
||||
keySize: int(clus.Tester.StressKeySize),
|
||||
keyLargeSize: int(clus.Tester.StressKeySizeLarge),
|
||||
@ -61,7 +62,7 @@ func newStresser(clus *Cluster, idx int) Stresser {
|
||||
|
||||
case "LEASE":
|
||||
stressers[i] = &leaseStresser{
|
||||
logger: clus.logger,
|
||||
lg: clus.lg,
|
||||
endpoint: clus.Members[idx].EtcdClientEndpoint,
|
||||
numLeases: 10, // TODO: configurable
|
||||
keysPerLease: 10, // TODO: configurable
|
||||
|
@ -33,7 +33,7 @@ import (
|
||||
)
|
||||
|
||||
type keyStresser struct {
|
||||
logger *zap.Logger
|
||||
lg *zap.Logger
|
||||
|
||||
Endpoint string // TODO: use Member
|
||||
|
||||
@ -96,7 +96,7 @@ func (s *keyStresser) Stress() error {
|
||||
go s.run(ctx)
|
||||
}
|
||||
|
||||
s.logger.Info(
|
||||
s.lg.Info(
|
||||
"key stresser started in background",
|
||||
zap.String("endpoint", s.Endpoint),
|
||||
)
|
||||
@ -150,7 +150,7 @@ func (s *keyStresser) run(ctx context.Context) {
|
||||
// from stresser.Cancel method:
|
||||
return
|
||||
default:
|
||||
s.logger.Warn(
|
||||
s.lg.Warn(
|
||||
"key stresser exited with error",
|
||||
zap.String("endpoint", s.Endpoint),
|
||||
zap.Error(err),
|
||||
@ -169,7 +169,7 @@ func (s *keyStresser) Close() {
|
||||
s.conn.Close()
|
||||
s.wg.Wait()
|
||||
|
||||
s.logger.Info(
|
||||
s.lg.Info(
|
||||
"key stresser is closed",
|
||||
zap.String("endpoint", s.Endpoint),
|
||||
)
|
||||
|
@ -37,7 +37,7 @@ const (
|
||||
)
|
||||
|
||||
type leaseStresser struct {
|
||||
logger *zap.Logger
|
||||
lg *zap.Logger
|
||||
|
||||
endpoint string
|
||||
cancel func()
|
||||
@ -122,7 +122,7 @@ func (ls *leaseStresser) setupOnce() error {
|
||||
}
|
||||
|
||||
func (ls *leaseStresser) Stress() error {
|
||||
ls.logger.Info(
|
||||
ls.lg.Info(
|
||||
"lease stresser is started",
|
||||
zap.String("endpoint", ls.endpoint),
|
||||
)
|
||||
@ -161,22 +161,22 @@ func (ls *leaseStresser) run() {
|
||||
return
|
||||
}
|
||||
|
||||
ls.logger.Debug(
|
||||
ls.lg.Debug(
|
||||
"lease stresser is creating leases",
|
||||
zap.String("endpoint", ls.endpoint),
|
||||
)
|
||||
ls.createLeases()
|
||||
ls.logger.Debug(
|
||||
ls.lg.Debug(
|
||||
"lease stresser created leases",
|
||||
zap.String("endpoint", ls.endpoint),
|
||||
)
|
||||
|
||||
ls.logger.Debug(
|
||||
ls.lg.Debug(
|
||||
"lease stresser is dropped leases",
|
||||
zap.String("endpoint", ls.endpoint),
|
||||
)
|
||||
ls.randomlyDropLeases()
|
||||
ls.logger.Debug(
|
||||
ls.lg.Debug(
|
||||
"lease stresser dropped leases",
|
||||
zap.String("endpoint", ls.endpoint),
|
||||
)
|
||||
@ -206,7 +206,7 @@ func (ls *leaseStresser) createAliveLeases() {
|
||||
defer wg.Done()
|
||||
leaseID, err := ls.createLeaseWithKeys(TTL)
|
||||
if err != nil {
|
||||
ls.logger.Debug(
|
||||
ls.lg.Debug(
|
||||
"createLeaseWithKeys failed",
|
||||
zap.String("endpoint", ls.endpoint),
|
||||
zap.Error(err),
|
||||
@ -244,7 +244,7 @@ func (ls *leaseStresser) createShortLivedLeases() {
|
||||
func (ls *leaseStresser) createLeaseWithKeys(ttl int64) (int64, error) {
|
||||
leaseID, err := ls.createLease(ttl)
|
||||
if err != nil {
|
||||
ls.logger.Debug(
|
||||
ls.lg.Debug(
|
||||
"createLease failed",
|
||||
zap.String("endpoint", ls.endpoint),
|
||||
zap.Error(err),
|
||||
@ -252,7 +252,7 @@ func (ls *leaseStresser) createLeaseWithKeys(ttl int64) (int64, error) {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
ls.logger.Debug(
|
||||
ls.lg.Debug(
|
||||
"createLease created lease",
|
||||
zap.String("endpoint", ls.endpoint),
|
||||
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
|
||||
@ -273,7 +273,7 @@ func (ls *leaseStresser) randomlyDropLeases() {
|
||||
// if randomlyDropLease encountered an error such as context is cancelled, remove the lease from aliveLeases
|
||||
// because we can't tell whether the lease is dropped or not.
|
||||
if err != nil {
|
||||
ls.logger.Debug(
|
||||
ls.lg.Debug(
|
||||
"randomlyDropLease failed",
|
||||
zap.String("endpoint", ls.endpoint),
|
||||
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
|
||||
@ -285,7 +285,7 @@ func (ls *leaseStresser) randomlyDropLeases() {
|
||||
if !dropped {
|
||||
return
|
||||
}
|
||||
ls.logger.Debug(
|
||||
ls.lg.Debug(
|
||||
"randomlyDropLease dropped a lease",
|
||||
zap.String("endpoint", ls.endpoint),
|
||||
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
|
||||
@ -314,7 +314,7 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) {
|
||||
select {
|
||||
case <-time.After(500 * time.Millisecond):
|
||||
case <-ls.ctx.Done():
|
||||
ls.logger.Debug(
|
||||
ls.lg.Debug(
|
||||
"keepLeaseAlive context canceled",
|
||||
zap.String("endpoint", ls.endpoint),
|
||||
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
|
||||
@ -328,7 +328,7 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) {
|
||||
renewTime, ok := ls.aliveLeases.read(leaseID)
|
||||
if ok && renewTime.Add(TTL/2*time.Second).Before(time.Now()) {
|
||||
ls.aliveLeases.remove(leaseID)
|
||||
ls.logger.Debug(
|
||||
ls.lg.Debug(
|
||||
"keepLeaseAlive lease has not been renewed, dropped it",
|
||||
zap.String("endpoint", ls.endpoint),
|
||||
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
|
||||
@ -338,7 +338,7 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) {
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
ls.logger.Debug(
|
||||
ls.lg.Debug(
|
||||
"keepLeaseAlive lease creates stream error",
|
||||
zap.String("endpoint", ls.endpoint),
|
||||
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
|
||||
@ -351,14 +351,14 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) {
|
||||
continue
|
||||
}
|
||||
|
||||
ls.logger.Debug(
|
||||
ls.lg.Debug(
|
||||
"keepLeaseAlive stream sends lease keepalive request",
|
||||
zap.String("endpoint", ls.endpoint),
|
||||
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
|
||||
)
|
||||
err = stream.Send(&pb.LeaseKeepAliveRequest{ID: leaseID})
|
||||
if err != nil {
|
||||
ls.logger.Debug(
|
||||
ls.lg.Debug(
|
||||
"keepLeaseAlive stream failed to send lease keepalive request",
|
||||
zap.String("endpoint", ls.endpoint),
|
||||
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
|
||||
@ -367,14 +367,14 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) {
|
||||
continue
|
||||
}
|
||||
leaseRenewTime := time.Now()
|
||||
ls.logger.Debug(
|
||||
ls.lg.Debug(
|
||||
"keepLeaseAlive stream sent lease keepalive request",
|
||||
zap.String("endpoint", ls.endpoint),
|
||||
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
|
||||
)
|
||||
respRC, err := stream.Recv()
|
||||
if err != nil {
|
||||
ls.logger.Debug(
|
||||
ls.lg.Debug(
|
||||
"keepLeaseAlive stream failed to receive lease keepalive response",
|
||||
zap.String("endpoint", ls.endpoint),
|
||||
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
|
||||
@ -385,7 +385,7 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) {
|
||||
// lease expires after TTL become 0
|
||||
// don't send keepalive if the lease has expired
|
||||
if respRC.TTL <= 0 {
|
||||
ls.logger.Debug(
|
||||
ls.lg.Debug(
|
||||
"keepLeaseAlive stream received lease keepalive response TTL <= 0",
|
||||
zap.String("endpoint", ls.endpoint),
|
||||
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
|
||||
@ -395,7 +395,7 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) {
|
||||
return
|
||||
}
|
||||
// renew lease timestamp only if lease is present
|
||||
ls.logger.Debug(
|
||||
ls.lg.Debug(
|
||||
"keepLeaseAlive renewed a lease",
|
||||
zap.String("endpoint", ls.endpoint),
|
||||
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
|
||||
@ -444,7 +444,7 @@ func (ls *leaseStresser) randomlyDropLease(leaseID int64) (bool, error) {
|
||||
}
|
||||
}
|
||||
|
||||
ls.logger.Debug(
|
||||
ls.lg.Debug(
|
||||
"randomlyDropLease error",
|
||||
zap.String("endpoint", ls.endpoint),
|
||||
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
|
||||
@ -458,7 +458,7 @@ func (ls *leaseStresser) Pause() {
|
||||
}
|
||||
|
||||
func (ls *leaseStresser) Close() {
|
||||
ls.logger.Info(
|
||||
ls.lg.Info(
|
||||
"lease stresser is closing",
|
||||
zap.String("endpoint", ls.endpoint),
|
||||
)
|
||||
@ -466,7 +466,7 @@ func (ls *leaseStresser) Close() {
|
||||
ls.runWg.Wait()
|
||||
ls.aliveWg.Wait()
|
||||
ls.conn.Close()
|
||||
ls.logger.Info(
|
||||
ls.lg.Info(
|
||||
"lease stresser is closed",
|
||||
zap.String("endpoint", ls.endpoint),
|
||||
)
|
||||
@ -478,7 +478,7 @@ func (ls *leaseStresser) ModifiedKeys() int64 {
|
||||
|
||||
func (ls *leaseStresser) Checker() Checker {
|
||||
return &leaseChecker{
|
||||
logger: ls.logger,
|
||||
lg: ls.lg,
|
||||
endpoint: ls.endpoint,
|
||||
ls: ls,
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user