diff --git a/tools/functional-tester/agent/handler.go b/tools/functional-tester/agent/handler.go index e5a0cbba9..c8ea2d331 100644 --- a/tools/functional-tester/agent/handler.go +++ b/tools/functional-tester/agent/handler.go @@ -36,7 +36,7 @@ func (srv *Server) handleTesterRequest(req *rpcpb.Request) (resp *rpcpb.Response defer func() { if err == nil { srv.last = req.Operation - srv.logger.Info("handler success", zap.String("operation", req.Operation.String())) + srv.lg.Info("handler success", zap.String("operation", req.Operation.String())) } }() @@ -78,24 +78,24 @@ func (srv *Server) handleInitialStartEtcd(req *rpcpb.Request) (*rpcpb.Response, srv.Member = req.Member srv.Tester = req.Tester - srv.logger.Info("creating base directory", zap.String("path", srv.Member.BaseDir)) + srv.lg.Info("creating base directory", zap.String("path", srv.Member.BaseDir)) err := fileutil.TouchDirAll(srv.Member.BaseDir) if err != nil { return nil, err } - srv.logger.Info("created base directory", zap.String("path", srv.Member.BaseDir)) + srv.lg.Info("created base directory", zap.String("path", srv.Member.BaseDir)) if err = srv.createEtcdFile(); err != nil { return nil, err } srv.creatEtcdCmd() - srv.logger.Info("starting etcd") + srv.lg.Info("starting etcd") err = srv.startEtcdCmd() if err != nil { return nil, err } - srv.logger.Info("started etcd", zap.String("command-path", srv.etcdCmd.Path)) + srv.lg.Info("started etcd", zap.String("command-path", srv.etcdCmd.Path)) // wait some time for etcd listener start // before setting up proxy @@ -121,9 +121,9 @@ func (srv *Server) startProxy() error { return err } - srv.logger.Info("starting proxy on client traffic", zap.String("url", advertiseClientURL.String())) + srv.lg.Info("starting proxy on client traffic", zap.String("url", advertiseClientURL.String())) srv.advertiseClientPortToProxy[advertiseClientURLPort] = transport.NewProxy(transport.ProxyConfig{ - Logger: srv.logger, + Logger: srv.lg, From: *advertiseClientURL, To: *listenClientURL, }) @@ -131,7 +131,7 @@ func (srv *Server) startProxy() error { case err = <-srv.advertiseClientPortToProxy[advertiseClientURLPort].Error(): return err case <-time.After(2 * time.Second): - srv.logger.Info("started proxy on client traffic", zap.String("url", advertiseClientURL.String())) + srv.lg.Info("started proxy on client traffic", zap.String("url", advertiseClientURL.String())) } } @@ -145,9 +145,9 @@ func (srv *Server) startProxy() error { return err } - srv.logger.Info("starting proxy on peer traffic", zap.String("url", advertisePeerURL.String())) + srv.lg.Info("starting proxy on peer traffic", zap.String("url", advertisePeerURL.String())) srv.advertisePeerPortToProxy[advertisePeerURLPort] = transport.NewProxy(transport.ProxyConfig{ - Logger: srv.logger, + Logger: srv.lg, From: *advertisePeerURL, To: *listenPeerURL, }) @@ -155,7 +155,7 @@ func (srv *Server) startProxy() error { case err = <-srv.advertisePeerPortToProxy[advertisePeerURLPort].Error(): return err case <-time.After(2 * time.Second): - srv.logger.Info("started proxy on peer traffic", zap.String("url", advertisePeerURL.String())) + srv.lg.Info("started proxy on peer traffic", zap.String("url", advertisePeerURL.String())) } } return nil @@ -164,13 +164,13 @@ func (srv *Server) startProxy() error { func (srv *Server) stopProxy() { if srv.Member.EtcdClientProxy && len(srv.advertiseClientPortToProxy) > 0 { for port, px := range srv.advertiseClientPortToProxy { - srv.logger.Info("closing proxy", + srv.lg.Info("closing proxy", zap.Int("port", port), zap.String("from", px.From()), zap.String("to", px.To()), ) if err := px.Close(); err != nil { - srv.logger.Warn("failed to close proxy", zap.Int("port", port)) + srv.lg.Warn("failed to close proxy", zap.Int("port", port)) continue } select { @@ -179,7 +179,7 @@ func (srv *Server) stopProxy() { time.Sleep(time.Second) case <-time.After(time.Second): } - srv.logger.Info("closed proxy", + srv.lg.Info("closed proxy", zap.Int("port", port), zap.String("from", px.From()), zap.String("to", px.To()), @@ -189,13 +189,13 @@ func (srv *Server) stopProxy() { } if srv.Member.EtcdPeerProxy && len(srv.advertisePeerPortToProxy) > 0 { for port, px := range srv.advertisePeerPortToProxy { - srv.logger.Info("closing proxy", + srv.lg.Info("closing proxy", zap.Int("port", port), zap.String("from", px.From()), zap.String("to", px.To()), ) if err := px.Close(); err != nil { - srv.logger.Warn("failed to close proxy", zap.Int("port", port)) + srv.lg.Warn("failed to close proxy", zap.Int("port", port)) continue } select { @@ -204,7 +204,7 @@ func (srv *Server) stopProxy() { time.Sleep(time.Second) case <-time.After(time.Second): } - srv.logger.Info("closed proxy", + srv.lg.Info("closed proxy", zap.Int("port", port), zap.String("from", px.From()), zap.String("to", px.To()), @@ -215,20 +215,20 @@ func (srv *Server) stopProxy() { } func (srv *Server) createEtcdFile() error { - srv.logger.Info("creating etcd log file", zap.String("path", srv.Member.EtcdLogPath)) + srv.lg.Info("creating etcd log file", zap.String("path", srv.Member.EtcdLogPath)) var err error srv.etcdLogFile, err = os.Create(srv.Member.EtcdLogPath) if err != nil { return err } - srv.logger.Info("created etcd log file", zap.String("path", srv.Member.EtcdLogPath)) + srv.lg.Info("created etcd log file", zap.String("path", srv.Member.EtcdLogPath)) return nil } func (srv *Server) creatEtcdCmd() { etcdPath, etcdFlags := srv.Member.EtcdExecPath, srv.Member.Etcd.Flags() u, _ := url.Parse(srv.Member.FailpointHTTPAddr) - srv.logger.Info("creating etcd command", + srv.lg.Info("creating etcd command", zap.String("etcd-exec-path", etcdPath), zap.Strings("etcd-flags", etcdFlags), zap.String("failpoint-http-addr", srv.Member.FailpointHTTPAddr), @@ -248,12 +248,12 @@ func (srv *Server) startEtcdCmd() error { func (srv *Server) handleRestartEtcd() (*rpcpb.Response, error) { srv.creatEtcdCmd() - srv.logger.Info("restarting etcd") + srv.lg.Info("restarting etcd") err := srv.startEtcdCmd() if err != nil { return nil, err } - srv.logger.Info("restarted etcd", zap.String("command-path", srv.etcdCmd.Path)) + srv.lg.Info("restarted etcd", zap.String("command-path", srv.etcdCmd.Path)) // wait some time for etcd listener start // before setting up proxy @@ -273,12 +273,12 @@ func (srv *Server) handleRestartEtcd() (*rpcpb.Response, error) { func (srv *Server) handleKillEtcd() (*rpcpb.Response, error) { srv.stopProxy() - srv.logger.Info("killing etcd", zap.String("signal", syscall.SIGTERM.String())) + srv.lg.Info("killing etcd", zap.String("signal", syscall.SIGTERM.String())) err := stopWithSig(srv.etcdCmd, syscall.SIGTERM) if err != nil { return nil, err } - srv.logger.Info("killed etcd", zap.String("signal", syscall.SIGTERM.String())) + srv.lg.Info("killed etcd", zap.String("signal", syscall.SIGTERM.String())) return &rpcpb.Response{ Success: true, @@ -290,18 +290,18 @@ func (srv *Server) handleFailArchive() (*rpcpb.Response, error) { srv.stopProxy() // exit with stackstrace - srv.logger.Info("killing etcd", zap.String("signal", syscall.SIGQUIT.String())) + srv.lg.Info("killing etcd", zap.String("signal", syscall.SIGQUIT.String())) err := stopWithSig(srv.etcdCmd, syscall.SIGQUIT) if err != nil { return nil, err } - srv.logger.Info("killed etcd", zap.String("signal", syscall.SIGQUIT.String())) + srv.lg.Info("killed etcd", zap.String("signal", syscall.SIGQUIT.String())) srv.etcdLogFile.Sync() srv.etcdLogFile.Close() // TODO: support separate WAL directory - srv.logger.Info("archiving data", zap.String("base-dir", srv.Member.BaseDir)) + srv.lg.Info("archiving data", zap.String("base-dir", srv.Member.BaseDir)) if err = archive( srv.Member.BaseDir, srv.Member.EtcdLogPath, @@ -309,17 +309,17 @@ func (srv *Server) handleFailArchive() (*rpcpb.Response, error) { ); err != nil { return nil, err } - srv.logger.Info("archived data", zap.String("base-dir", srv.Member.BaseDir)) + srv.lg.Info("archived data", zap.String("base-dir", srv.Member.BaseDir)) if err = srv.createEtcdFile(); err != nil { return nil, err } - srv.logger.Info("cleaning up page cache") + srv.lg.Info("cleaning up page cache") if err := cleanPageCache(); err != nil { - srv.logger.Warn("failed to clean up page cache", zap.String("error", err.Error())) + srv.lg.Warn("failed to clean up page cache", zap.String("error", err.Error())) } - srv.logger.Info("cleaned up page cache") + srv.lg.Info("cleaned up page cache") return &rpcpb.Response{ Success: true, @@ -329,32 +329,32 @@ func (srv *Server) handleFailArchive() (*rpcpb.Response, error) { // stop proxy, etcd, delete data directory func (srv *Server) handleDestroyEtcdAgent() (*rpcpb.Response, error) { - srv.logger.Info("killing etcd", zap.String("signal", syscall.SIGTERM.String())) + srv.lg.Info("killing etcd", zap.String("signal", syscall.SIGTERM.String())) err := stopWithSig(srv.etcdCmd, syscall.SIGTERM) if err != nil { return nil, err } - srv.logger.Info("killed etcd", zap.String("signal", syscall.SIGTERM.String())) + srv.lg.Info("killed etcd", zap.String("signal", syscall.SIGTERM.String())) - srv.logger.Info("removing base directory", zap.String("dir", srv.Member.BaseDir)) + srv.lg.Info("removing base directory", zap.String("dir", srv.Member.BaseDir)) err = os.RemoveAll(srv.Member.BaseDir) if err != nil { return nil, err } - srv.logger.Info("removed base directory", zap.String("dir", srv.Member.BaseDir)) + srv.lg.Info("removed base directory", zap.String("dir", srv.Member.BaseDir)) // stop agent server srv.Stop() for port, px := range srv.advertiseClientPortToProxy { - srv.logger.Info("closing proxy", zap.Int("client-port", port)) + srv.lg.Info("closing proxy", zap.Int("client-port", port)) err := px.Close() - srv.logger.Info("closed proxy", zap.Int("client-port", port), zap.Error(err)) + srv.lg.Info("closed proxy", zap.Int("client-port", port), zap.Error(err)) } for port, px := range srv.advertisePeerPortToProxy { - srv.logger.Info("closing proxy", zap.Int("peer-port", port)) + srv.lg.Info("closing proxy", zap.Int("peer-port", port)) err := px.Close() - srv.logger.Info("closed proxy", zap.Int("peer-port", port), zap.Error(err)) + srv.lg.Info("closed proxy", zap.Int("peer-port", port), zap.Error(err)) } return &rpcpb.Response{ @@ -365,10 +365,10 @@ func (srv *Server) handleDestroyEtcdAgent() (*rpcpb.Response, error) { func (srv *Server) handleBlackholePeerPortTxRx() (*rpcpb.Response, error) { for port, px := range srv.advertisePeerPortToProxy { - srv.logger.Info("blackholing", zap.Int("peer-port", port)) + srv.lg.Info("blackholing", zap.Int("peer-port", port)) px.BlackholeTx() px.BlackholeRx() - srv.logger.Info("blackholed", zap.Int("peer-port", port)) + srv.lg.Info("blackholed", zap.Int("peer-port", port)) } return &rpcpb.Response{ Success: true, @@ -378,10 +378,10 @@ func (srv *Server) handleBlackholePeerPortTxRx() (*rpcpb.Response, error) { func (srv *Server) handleUnblackholePeerPortTxRx() (*rpcpb.Response, error) { for port, px := range srv.advertisePeerPortToProxy { - srv.logger.Info("unblackholing", zap.Int("peer-port", port)) + srv.lg.Info("unblackholing", zap.Int("peer-port", port)) px.UnblackholeTx() px.UnblackholeRx() - srv.logger.Info("unblackholed", zap.Int("peer-port", port)) + srv.lg.Info("unblackholed", zap.Int("peer-port", port)) } return &rpcpb.Response{ Success: true, @@ -394,14 +394,14 @@ func (srv *Server) handleDelayPeerPortTxRx() (*rpcpb.Response, error) { rv := time.Duration(srv.Tester.DelayLatencyMsRv) * time.Millisecond for port, px := range srv.advertisePeerPortToProxy { - srv.logger.Info("delaying", + srv.lg.Info("delaying", zap.Int("peer-port", port), zap.Duration("latency", lat), zap.Duration("random-variable", rv), ) px.DelayTx(lat, rv) px.DelayRx(lat, rv) - srv.logger.Info("delayed", + srv.lg.Info("delayed", zap.Int("peer-port", port), zap.Duration("latency", lat), zap.Duration("random-variable", rv), @@ -416,10 +416,10 @@ func (srv *Server) handleDelayPeerPortTxRx() (*rpcpb.Response, error) { func (srv *Server) handleUndelayPeerPortTxRx() (*rpcpb.Response, error) { for port, px := range srv.advertisePeerPortToProxy { - srv.logger.Info("undelaying", zap.Int("peer-port", port)) + srv.lg.Info("undelaying", zap.Int("peer-port", port)) px.UndelayTx() px.UndelayRx() - srv.logger.Info("undelayed", zap.Int("peer-port", port)) + srv.lg.Info("undelayed", zap.Int("peer-port", port)) } return &rpcpb.Response{ Success: true, diff --git a/tools/functional-tester/agent/server.go b/tools/functional-tester/agent/server.go index c3239a441..d693a961b 100644 --- a/tools/functional-tester/agent/server.go +++ b/tools/functional-tester/agent/server.go @@ -34,7 +34,7 @@ import ( // serialized in tester-side type Server struct { grpcServer *grpc.Server - logger *zap.Logger + lg *zap.Logger network string address string @@ -56,12 +56,12 @@ type Server struct { // NewServer returns a new agent server. func NewServer( - logger *zap.Logger, + lg *zap.Logger, network string, address string, ) *Server { return &Server{ - logger: logger, + lg: lg, network: network, address: address, last: rpcpb.Operation_NotStarted, @@ -93,34 +93,33 @@ func (srv *Server) StartServe() error { rpcpb.RegisterTransportServer(srv.grpcServer, srv) - srv.logger.Info( + srv.lg.Info( "gRPC server started", zap.String("address", srv.address), zap.String("listener-address", srv.ln.Addr().String()), ) err = srv.grpcServer.Serve(srv.ln) if err != nil && strings.Contains(err.Error(), "use of closed network connection") { - srv.logger.Info( + srv.lg.Info( "gRPC server is shut down", zap.String("address", srv.address), zap.Error(err), ) } else { - srv.logger.Warn( + srv.lg.Warn( "gRPC server returned with error", zap.String("address", srv.address), zap.Error(err), ) } - return err } // Stop stops serving gRPC server. func (srv *Server) Stop() { - srv.logger.Info("gRPC server stopping", zap.String("address", srv.address)) + srv.lg.Info("gRPC server stopping", zap.String("address", srv.address)) srv.grpcServer.Stop() - srv.logger.Info("gRPC server stopped", zap.String("address", srv.address)) + srv.lg.Info("gRPC server stopped", zap.String("address", srv.address)) } // Transport communicates with etcd tester. diff --git a/tools/functional-tester/rpcpb/member.go b/tools/functional-tester/rpcpb/member.go index 5a81e6f0a..f82fd4d9e 100644 --- a/tools/functional-tester/rpcpb/member.go +++ b/tools/functional-tester/rpcpb/member.go @@ -32,12 +32,12 @@ var dialOpts = []grpc.DialOption{ } // DialEtcdGRPCServer creates a raw gRPC connection to an etcd member. -func (m *Member) DialEtcdGRPCServer() (*grpc.ClientConn, error) { +func (m *Member) DialEtcdGRPCServer(opts ...grpc.DialOption) (*grpc.ClientConn, error) { if m.EtcdClientTLS { // TODO: support TLS panic("client TLS not supported yet") } - return grpc.Dial(m.EtcdClientEndpoint, dialOpts...) + return grpc.Dial(m.EtcdClientEndpoint, append(dialOpts, opts...)...) } // CreateEtcdClient creates a client from member. diff --git a/tools/functional-tester/tester/checks.go b/tools/functional-tester/tester/checks.go index 8817bcca4..6c1c0a75b 100644 --- a/tools/functional-tester/tester/checks.go +++ b/tools/functional-tester/tester/checks.go @@ -22,6 +22,7 @@ import ( "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/tools/functional-tester/rpcpb" "go.uber.org/zap" "google.golang.org/grpc" @@ -29,6 +30,7 @@ import ( const retries = 7 +// Checker checks cluster consistency. type Checker interface { // Check returns an error if the system fails a consistency check. Check() error @@ -39,14 +41,14 @@ type hashAndRevGetter interface { } type hashChecker struct { - logger *zap.Logger - hrg hashAndRevGetter + lg *zap.Logger + hrg hashAndRevGetter } -func newHashChecker(logger *zap.Logger, hrg hashAndRevGetter) Checker { +func newHashChecker(lg *zap.Logger, hrg hashAndRevGetter) Checker { return &hashChecker{ - logger: logger, - hrg: hrg, + lg: lg, + hrg: hrg, } } @@ -57,12 +59,11 @@ func (hc *hashChecker) checkRevAndHashes() (err error) { revs map[string]int64 hashes map[string]int64 ) - // retries in case of transient failure or etcd cluster has not stablized yet. for i := 0; i < retries; i++ { revs, hashes, err = hc.hrg.getRevisionHash() if err != nil { - hc.logger.Warn( + hc.lg.Warn( "failed to get revision and hash", zap.Int("retries", i), zap.Error(err), @@ -73,7 +74,7 @@ func (hc *hashChecker) checkRevAndHashes() (err error) { if sameRev && sameHashes { return nil } - hc.logger.Warn( + hc.lg.Warn( "retrying; etcd cluster is not stable", zap.Int("retries", i), zap.Bool("same-revisions", sameRev), @@ -97,19 +98,17 @@ func (hc *hashChecker) Check() error { } type leaseChecker struct { - logger *zap.Logger - - endpoint string // TODO: use Member - - ls *leaseStresser - leaseClient pb.LeaseClient - kvc pb.KVClient + lg *zap.Logger + m *rpcpb.Member + ls *leaseStresser + lsc pb.LeaseClient + kvc pb.KVClient } func (lc *leaseChecker) Check() error { - conn, err := grpc.Dial(lc.ls.endpoint, grpc.WithInsecure(), grpc.WithBackoffMaxDelay(1)) + conn, err := lc.m.DialEtcdGRPCServer(grpc.WithBackoffMaxDelay(time.Second)) if err != nil { - return fmt.Errorf("%v (%s)", err, lc.ls.endpoint) + return fmt.Errorf("%v (%q)", err, lc.m.EtcdClientEndpoint) } defer func() { if conn != nil { @@ -117,7 +116,7 @@ func (lc *leaseChecker) Check() error { } }() lc.kvc = pb.NewKVClient(conn) - lc.leaseClient = pb.NewLeaseClient(conn) + lc.lsc = pb.NewLeaseClient(conn) if err := lc.check(true, lc.ls.revokedLeases.leases); err != nil { return err } @@ -157,7 +156,7 @@ func (lc *leaseChecker) checkShortLivedLease(ctx context.Context, leaseID int64) return nil } if err != nil { - lc.logger.Debug( + lc.lg.Debug( "retrying; Lease TimeToLive failed", zap.Int("retries", i), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), @@ -167,7 +166,7 @@ func (lc *leaseChecker) checkShortLivedLease(ctx context.Context, leaseID int64) } if resp.TTL > 0 { dur := time.Duration(resp.TTL) * time.Second - lc.logger.Debug( + lc.lg.Debug( "lease has not been expired, wait until expire", zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.Int64("ttl", resp.TTL), @@ -175,7 +174,7 @@ func (lc *leaseChecker) checkShortLivedLease(ctx context.Context, leaseID int64) ) time.Sleep(dur) } else { - lc.logger.Debug( + lc.lg.Debug( "lease expired but not yet revoked", zap.Int("retries", i), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), @@ -195,18 +194,18 @@ func (lc *leaseChecker) checkShortLivedLease(ctx context.Context, leaseID int64) func (lc *leaseChecker) checkLease(ctx context.Context, expired bool, leaseID int64) error { keysExpired, err := lc.hasKeysAttachedToLeaseExpired(ctx, leaseID) if err != nil { - lc.logger.Warn( + lc.lg.Warn( "hasKeysAttachedToLeaseExpired failed", - zap.String("endpoint", lc.endpoint), + zap.String("endpoint", lc.m.EtcdClientEndpoint), zap.Error(err), ) return err } leaseExpired, err := lc.hasLeaseExpired(ctx, leaseID) if err != nil { - lc.logger.Warn( + lc.lg.Warn( "hasLeaseExpired failed", - zap.String("endpoint", lc.endpoint), + zap.String("endpoint", lc.m.EtcdClientEndpoint), zap.Error(err), ) return err @@ -233,7 +232,7 @@ func (lc *leaseChecker) check(expired bool, leases map[int64]time.Time) error { func (lc *leaseChecker) getLeaseByID(ctx context.Context, leaseID int64) (*pb.LeaseTimeToLiveResponse, error) { ltl := &pb.LeaseTimeToLiveRequest{ID: leaseID, Keys: true} - return lc.leaseClient.LeaseTimeToLive(ctx, ltl, grpc.FailFast(false)) + return lc.lsc.LeaseTimeToLive(ctx, ltl, grpc.FailFast(false)) } func (lc *leaseChecker) hasLeaseExpired(ctx context.Context, leaseID int64) (bool, error) { @@ -248,9 +247,9 @@ func (lc *leaseChecker) hasLeaseExpired(ctx context.Context, leaseID int64) (boo } else { return resp.TTL == -1, nil } - lc.logger.Warn( + lc.lg.Warn( "hasLeaseExpired getLeaseByID failed", - zap.String("endpoint", lc.endpoint), + zap.String("endpoint", lc.m.EtcdClientEndpoint), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.Error(err), ) @@ -267,9 +266,9 @@ func (lc *leaseChecker) hasKeysAttachedToLeaseExpired(ctx context.Context, lease RangeEnd: []byte(clientv3.GetPrefixRangeEnd(fmt.Sprintf("%d", leaseID))), }, grpc.FailFast(false)) if err != nil { - lc.logger.Warn( + lc.lg.Warn( "hasKeysAttachedToLeaseExpired failed", - zap.String("endpoint", lc.endpoint), + zap.String("endpoint", lc.m.EtcdClientEndpoint), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.Error(err), ) diff --git a/tools/functional-tester/tester/cluster.go b/tools/functional-tester/tester/cluster.go index f94287515..60b14097b 100644 --- a/tools/functional-tester/tester/cluster.go +++ b/tools/functional-tester/tester/cluster.go @@ -38,7 +38,7 @@ import ( // Cluster defines tester cluster. type Cluster struct { - logger *zap.Logger + lg *zap.Logger agentConns []*grpc.ClientConn agentClients []rpcpb.TransportClient @@ -61,15 +61,15 @@ type Cluster struct { cs int } -func newCluster(logger *zap.Logger, fpath string) (*Cluster, error) { - logger.Info("reading configuration file", zap.String("path", fpath)) +func newCluster(lg *zap.Logger, fpath string) (*Cluster, error) { + lg.Info("reading configuration file", zap.String("path", fpath)) bts, err := ioutil.ReadFile(fpath) if err != nil { return nil, err } - logger.Info("opened configuration file", zap.String("path", fpath)) + lg.Info("opened configuration file", zap.String("path", fpath)) - clus := &Cluster{logger: logger} + clus := &Cluster{lg: lg} if err = yaml.Unmarshal(bts, clus); err != nil { return nil, err } @@ -192,8 +192,8 @@ var dialOpts = []grpc.DialOption{ } // NewCluster creates a client from a tester configuration. -func NewCluster(logger *zap.Logger, fpath string) (*Cluster, error) { - clus, err := newCluster(logger, fpath) +func NewCluster(lg *zap.Logger, fpath string) (*Cluster, error) { + clus, err := newCluster(lg, fpath) if err != nil { return nil, err } @@ -205,21 +205,21 @@ func NewCluster(logger *zap.Logger, fpath string) (*Cluster, error) { clus.failures = make([]Failure, 0) for i, ap := range clus.Members { - logger.Info("connecting", zap.String("agent-address", ap.AgentAddr)) + clus.lg.Info("connecting", zap.String("agent-address", ap.AgentAddr)) var err error clus.agentConns[i], err = grpc.Dial(ap.AgentAddr, dialOpts...) if err != nil { return nil, err } clus.agentClients[i] = rpcpb.NewTransportClient(clus.agentConns[i]) - logger.Info("connected", zap.String("agent-address", ap.AgentAddr)) + clus.lg.Info("connected", zap.String("agent-address", ap.AgentAddr)) - logger.Info("creating stream", zap.String("agent-address", ap.AgentAddr)) + clus.lg.Info("creating stream", zap.String("agent-address", ap.AgentAddr)) clus.agentStreams[i], err = clus.agentClients[i].Transport(context.Background()) if err != nil { return nil, err } - logger.Info("created stream", zap.String("agent-address", ap.AgentAddr)) + clus.lg.Info("created stream", zap.String("agent-address", ap.AgentAddr)) } mux := http.NewServeMux() @@ -246,18 +246,18 @@ func NewCluster(logger *zap.Logger, fpath string) (*Cluster, error) { } func (clus *Cluster) serveTesterServer() { - clus.logger.Info( + clus.lg.Info( "started tester HTTP server", zap.String("tester-address", clus.Tester.TesterAddr), ) err := clus.testerHTTPServer.ListenAndServe() - clus.logger.Info( + clus.lg.Info( "tester HTTP server returned", zap.String("tester-address", clus.Tester.TesterAddr), zap.Error(err), ) if err != nil && err != http.ErrServerClosed { - clus.logger.Fatal("tester HTTP errored", zap.Error(err)) + clus.lg.Fatal("tester HTTP errored", zap.Error(err)) } } @@ -291,7 +291,7 @@ func (clus *Cluster) updateFailures() { case "FAILPOINTS": fpFailures, fperr := failpointFailures(clus) if len(fpFailures) == 0 { - clus.logger.Info("no failpoints found!", zap.Error(fperr)) + clus.lg.Info("no failpoints found!", zap.Error(fperr)) } clus.failures = append(clus.failures, fpFailures...) case "NO_FAIL": @@ -316,13 +316,13 @@ func (clus *Cluster) shuffleFailures() { n := len(clus.failures) cp := coprime(n) - clus.logger.Info("shuffling test failure cases", zap.Int("total", n)) + clus.lg.Info("shuffling test failure cases", zap.Int("total", n)) fs := make([]Failure, n) for i := 0; i < n; i++ { fs[i] = clus.failures[(cp*i+offset)%n] } clus.failures = fs - clus.logger.Info("shuffled test failure cases", zap.Int("total", n)) + clus.lg.Info("shuffled test failure cases", zap.Int("total", n)) } /* @@ -354,7 +354,7 @@ func gcd(x, y int) int { } func (clus *Cluster) updateStresserChecker() { - clus.logger.Info( + clus.lg.Info( "updating stressers", zap.Int("round", clus.rd), zap.Int("case", clus.cs), @@ -367,7 +367,7 @@ func (clus *Cluster) updateStresserChecker() { clus.stresser = cs if clus.Tester.ConsistencyCheck { - clus.checker = newHashChecker(clus.logger, hashAndRevGetter(clus)) + clus.checker = newHashChecker(clus.lg, hashAndRevGetter(clus)) if schk := cs.Checker(); schk != nil { clus.checker = newCompositeChecker([]Checker{clus.checker, schk}) } @@ -375,7 +375,7 @@ func (clus *Cluster) updateStresserChecker() { clus.checker = newNoChecker() } - clus.logger.Info( + clus.lg.Info( "updated stressers", zap.Int("round", clus.rd), zap.Int("case", clus.cs), @@ -383,13 +383,13 @@ func (clus *Cluster) updateStresserChecker() { } func (clus *Cluster) startStresser() (err error) { - clus.logger.Info( + clus.lg.Info( "starting stressers", zap.Int("round", clus.rd), zap.Int("case", clus.cs), ) err = clus.stresser.Stress() - clus.logger.Info( + clus.lg.Info( "started stressers", zap.Int("round", clus.rd), zap.Int("case", clus.cs), @@ -398,13 +398,13 @@ func (clus *Cluster) startStresser() (err error) { } func (clus *Cluster) closeStresser() { - clus.logger.Info( + clus.lg.Info( "closing stressers", zap.Int("round", clus.rd), zap.Int("case", clus.cs), ) clus.stresser.Close() - clus.logger.Info( + clus.lg.Info( "closed stressers", zap.Int("round", clus.rd), zap.Int("case", clus.cs), @@ -412,13 +412,13 @@ func (clus *Cluster) closeStresser() { } func (clus *Cluster) pauseStresser() { - clus.logger.Info( + clus.lg.Info( "pausing stressers", zap.Int("round", clus.rd), zap.Int("case", clus.cs), ) clus.stresser.Pause() - clus.logger.Info( + clus.lg.Info( "paused stressers", zap.Int("round", clus.rd), zap.Int("case", clus.cs), @@ -431,7 +431,7 @@ func (clus *Cluster) checkConsistency() (err error) { return } if err = clus.updateRevision(); err != nil { - clus.logger.Warn( + clus.lg.Warn( "updateRevision failed", zap.Error(err), ) @@ -440,20 +440,20 @@ func (clus *Cluster) checkConsistency() (err error) { err = clus.startStresser() }() - clus.logger.Info( + clus.lg.Info( "checking consistency and invariant of cluster", zap.Int("round", clus.rd), zap.Int("case", clus.cs), zap.String("desc", clus.failures[clus.cs].Desc()), ) if err = clus.checker.Check(); err != nil { - clus.logger.Warn( + clus.lg.Warn( "checker.Check failed", zap.Error(err), ) return err } - clus.logger.Info( + clus.lg.Info( "checked consistency and invariant of cluster", zap.Int("round", clus.rd), zap.Int("case", clus.cs), @@ -488,7 +488,7 @@ func (clus *Cluster) broadcastOperation(op rpcpb.Operation) error { strings.Contains(err.Error(), "rpc error: code = Unavailable desc = transport is closing") { // agent server has already closed; // so this error is expected - clus.logger.Info( + clus.lg.Info( "successfully destroyed", zap.String("member", clus.Members[i].EtcdClientEndpoint), ) @@ -511,13 +511,13 @@ func (clus *Cluster) sendOperation(idx int, op rpcpb.Operation) error { clus.agentRequests[idx].Operation = op } - clus.logger.Info( + clus.lg.Info( "sending request", zap.String("operation", op.String()), zap.String("to", clus.Members[idx].EtcdClientEndpoint), ) err := clus.agentStreams[idx].Send(clus.agentRequests[idx]) - clus.logger.Info( + clus.lg.Info( "sent request", zap.String("operation", op.String()), zap.String("to", clus.Members[idx].EtcdClientEndpoint), @@ -527,14 +527,14 @@ func (clus *Cluster) sendOperation(idx int, op rpcpb.Operation) error { return err } - clus.logger.Info( + clus.lg.Info( "receiving response", zap.String("operation", op.String()), zap.String("from", clus.Members[idx].EtcdClientEndpoint), ) resp, err := clus.agentStreams[idx].Recv() if resp != nil { - clus.logger.Info( + clus.lg.Info( "received response", zap.String("operation", op.String()), zap.String("from", clus.Members[idx].EtcdClientEndpoint), @@ -543,7 +543,7 @@ func (clus *Cluster) sendOperation(idx int, op rpcpb.Operation) error { zap.Error(err), ) } else { - clus.logger.Info( + clus.lg.Info( "received empty response", zap.String("operation", op.String()), zap.String("from", clus.Members[idx].EtcdClientEndpoint), @@ -562,26 +562,26 @@ func (clus *Cluster) sendOperation(idx int, op rpcpb.Operation) error { // DestroyEtcdAgents terminates all tester connections to agents and etcd servers. func (clus *Cluster) DestroyEtcdAgents() { - clus.logger.Info("destroying etcd servers and agents") + clus.lg.Info("destroying etcd servers and agents") err := clus.broadcastOperation(rpcpb.Operation_DestroyEtcdAgent) if err != nil { - clus.logger.Warn("failed to destroy etcd servers and agents", zap.Error(err)) + clus.lg.Warn("failed to destroy etcd servers and agents", zap.Error(err)) } else { - clus.logger.Info("destroyed etcd servers and agents") + clus.lg.Info("destroyed etcd servers and agents") } for i, conn := range clus.agentConns { - clus.logger.Info("closing connection to agent", zap.String("agent-address", clus.Members[i].AgentAddr)) + clus.lg.Info("closing connection to agent", zap.String("agent-address", clus.Members[i].AgentAddr)) err := conn.Close() - clus.logger.Info("closed connection to agent", zap.String("agent-address", clus.Members[i].AgentAddr), zap.Error(err)) + clus.lg.Info("closed connection to agent", zap.String("agent-address", clus.Members[i].AgentAddr), zap.Error(err)) } if clus.testerHTTPServer != nil { - clus.logger.Info("closing tester HTTP server", zap.String("tester-address", clus.Tester.TesterAddr)) + clus.lg.Info("closing tester HTTP server", zap.String("tester-address", clus.Tester.TesterAddr)) ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) err := clus.testerHTTPServer.Shutdown(ctx) cancel() - clus.logger.Info("closed tester HTTP server", zap.String("tester-address", clus.Tester.TesterAddr), zap.Error(err)) + clus.lg.Info("closed tester HTTP server", zap.String("tester-address", clus.Tester.TesterAddr), zap.Error(err)) } } @@ -595,13 +595,13 @@ func (clus *Cluster) WaitHealth() error { // reasonable workload (https://github.com/coreos/etcd/issues/2698) for i := 0; i < 60; i++ { for _, m := range clus.Members { - clus.logger.Info( + clus.lg.Info( "writing health key", zap.Int("retries", i), zap.String("endpoint", m.EtcdClientEndpoint), ) if err = m.WriteHealthKey(); err != nil { - clus.logger.Warn( + clus.lg.Warn( "writing health key failed", zap.Int("retries", i), zap.String("endpoint", m.EtcdClientEndpoint), @@ -609,14 +609,14 @@ func (clus *Cluster) WaitHealth() error { ) break } - clus.logger.Info( + clus.lg.Info( "wrote health key", zap.Int("retries", i), zap.String("endpoint", m.EtcdClientEndpoint), ) } if err == nil { - clus.logger.Info( + clus.lg.Info( "writing health key success on all members", zap.Int("retries", i), ) @@ -683,7 +683,7 @@ func (clus *Cluster) compactKV(rev int64, timeout time.Duration) (err error) { for i, m := range clus.Members { conn, derr := m.DialEtcdGRPCServer() if derr != nil { - clus.logger.Warn( + clus.lg.Warn( "compactKV dial failed", zap.String("endpoint", m.EtcdClientEndpoint), zap.Error(derr), @@ -693,7 +693,7 @@ func (clus *Cluster) compactKV(rev int64, timeout time.Duration) (err error) { } kvc := pb.NewKVClient(conn) - clus.logger.Info( + clus.lg.Info( "compacting", zap.String("endpoint", m.EtcdClientEndpoint), zap.Int64("compact-revision", rev), @@ -709,14 +709,14 @@ func (clus *Cluster) compactKV(rev int64, timeout time.Duration) (err error) { succeed := true if cerr != nil { if strings.Contains(cerr.Error(), "required revision has been compacted") && i > 0 { - clus.logger.Info( + clus.lg.Info( "compact error is ignored", zap.String("endpoint", m.EtcdClientEndpoint), zap.Int64("compact-revision", rev), zap.Error(cerr), ) } else { - clus.logger.Warn( + clus.lg.Warn( "compact failed", zap.String("endpoint", m.EtcdClientEndpoint), zap.Int64("compact-revision", rev), @@ -728,7 +728,7 @@ func (clus *Cluster) compactKV(rev int64, timeout time.Duration) (err error) { } if succeed { - clus.logger.Info( + clus.lg.Info( "compacted", zap.String("endpoint", m.EtcdClientEndpoint), zap.Int64("compact-revision", rev), @@ -753,14 +753,14 @@ func (clus *Cluster) checkCompact(rev int64) error { } func (clus *Cluster) defrag() error { - clus.logger.Info( + clus.lg.Info( "defragmenting", zap.Int("round", clus.rd), zap.Int("case", clus.cs), ) for _, m := range clus.Members { if err := m.Defrag(); err != nil { - clus.logger.Warn( + clus.lg.Warn( "defrag failed", zap.Int("round", clus.rd), zap.Int("case", clus.cs), @@ -769,7 +769,7 @@ func (clus *Cluster) defrag() error { return err } } - clus.logger.Info( + clus.lg.Info( "defragmented", zap.Int("round", clus.rd), zap.Int("case", clus.cs), diff --git a/tools/functional-tester/tester/cluster_test.go b/tools/functional-tester/tester/cluster_test.go index eab971142..a6ac44c21 100644 --- a/tools/functional-tester/tester/cluster_test.go +++ b/tools/functional-tester/tester/cluster_test.go @@ -156,13 +156,13 @@ func Test_newCluster(t *testing.T) { if err != nil { t.Fatal(err) } - cfg.logger = nil + cfg.lg = nil if !reflect.DeepEqual(exp, cfg) { t.Fatalf("expected %+v, got %+v", exp, cfg) } - cfg.logger = logger + cfg.lg = logger cfg.updateFailures() fs1 := cfg.failureStrings() diff --git a/tools/functional-tester/tester/cluster_tester.go b/tools/functional-tester/tester/cluster_tester.go index 8e2b4662c..9d6928ce8 100644 --- a/tools/functional-tester/tester/cluster_tester.go +++ b/tools/functional-tester/tester/cluster_tester.go @@ -37,7 +37,7 @@ func (clus *Cluster) StartTester() { clus.rd = round if err := clus.doRound(); err != nil { - clus.logger.Warn( + clus.lg.Warn( "doRound failed; returning", zap.Int("round", clus.rd), zap.Int("case", clus.cs), @@ -60,21 +60,21 @@ func (clus *Cluster) StartTester() { preModifiedKey = currentModifiedKey timeout := 10 * time.Second timeout += time.Duration(modifiedKey/compactQPS) * time.Second - clus.logger.Info( + clus.lg.Info( "compacting", zap.Int("round", clus.rd), zap.Int("case", clus.cs), zap.Duration("timeout", timeout), ) if err := clus.compact(revToCompact, timeout); err != nil { - clus.logger.Warn( + clus.lg.Warn( "compact failed", zap.Int("round", clus.rd), zap.Int("case", clus.cs), zap.Error(err), ) if err = clus.cleanup(); err != nil { - clus.logger.Warn( + clus.lg.Warn( "cleanup failed", zap.Int("round", clus.rd), zap.Int("case", clus.cs), @@ -87,7 +87,7 @@ func (clus *Cluster) StartTester() { } if round > 0 && round%500 == 0 { // every 500 rounds if err := clus.defrag(); err != nil { - clus.logger.Warn( + clus.lg.Warn( "defrag failed; returning", zap.Int("round", clus.rd), zap.Int("case", clus.cs), @@ -99,7 +99,7 @@ func (clus *Cluster) StartTester() { } } - clus.logger.Info( + clus.lg.Info( "functional-tester passed", zap.Int("round", clus.rd), zap.Int("case", clus.cs), @@ -111,7 +111,7 @@ func (clus *Cluster) doRound() error { clus.shuffleFailures() } - clus.logger.Info( + clus.lg.Info( "starting round", zap.Int("round", clus.rd), zap.Strings("failures", clus.failureStrings()), @@ -121,12 +121,12 @@ func (clus *Cluster) doRound() error { caseTotalCounter.WithLabelValues(f.Desc()).Inc() - clus.logger.Info("wait health before injecting failures") + clus.lg.Info("wait health before injecting failures") if err := clus.WaitHealth(); err != nil { return fmt.Errorf("wait full health error: %v", err) } - clus.logger.Info( + clus.lg.Info( "injecting failure", zap.Int("round", clus.rd), zap.Int("case", clus.cs), @@ -135,7 +135,7 @@ func (clus *Cluster) doRound() error { if err := f.Inject(clus); err != nil { return fmt.Errorf("injection error: %v", err) } - clus.logger.Info( + clus.lg.Info( "injected failure", zap.Int("round", clus.rd), zap.Int("case", clus.cs), @@ -145,7 +145,7 @@ func (clus *Cluster) doRound() error { // if run local, recovering server may conflict // with stressing client ports // TODO: use unix for local tests - clus.logger.Info( + clus.lg.Info( "recovering failure", zap.Int("round", clus.rd), zap.Int("case", clus.cs), @@ -154,26 +154,26 @@ func (clus *Cluster) doRound() error { if err := f.Recover(clus); err != nil { return fmt.Errorf("recovery error: %v", err) } - clus.logger.Info( + clus.lg.Info( "recovered failure", zap.Int("round", clus.rd), zap.Int("case", clus.cs), zap.String("desc", f.Desc()), ) - clus.logger.Info("pausing stresser after failure recovery, before wait health") + clus.lg.Info("pausing stresser after failure recovery, before wait health") clus.pauseStresser() - clus.logger.Info("wait health after recovering failures") + clus.lg.Info("wait health after recovering failures") if err := clus.WaitHealth(); err != nil { return fmt.Errorf("wait full health error: %v", err) } - clus.logger.Info("check consistency after recovering failures") + clus.lg.Info("check consistency after recovering failures") if err := clus.checkConsistency(); err != nil { return fmt.Errorf("tt.checkConsistency error (%v)", err) } - clus.logger.Info( + clus.lg.Info( "failure case passed", zap.Int("round", clus.rd), zap.Int("case", clus.cs), @@ -181,7 +181,7 @@ func (clus *Cluster) doRound() error { ) } - clus.logger.Info( + clus.lg.Info( "finished round", zap.Int("round", clus.rd), zap.Strings("failures", clus.failureStrings()), @@ -196,7 +196,7 @@ func (clus *Cluster) updateRevision() error { break // just need get one of the current revisions } - clus.logger.Info( + clus.lg.Info( "updated current revision", zap.Int64("current-revision", clus.currentRevision), ) @@ -204,7 +204,7 @@ func (clus *Cluster) updateRevision() error { } func (clus *Cluster) compact(rev int64, timeout time.Duration) (err error) { - clus.logger.Info("pausing stresser before compact") + clus.lg.Info("pausing stresser before compact") clus.pauseStresser() defer func() { if err == nil { @@ -212,7 +212,7 @@ func (clus *Cluster) compact(rev int64, timeout time.Duration) (err error) { } }() - clus.logger.Info( + clus.lg.Info( "compacting storage", zap.Int64("current-revision", clus.currentRevision), zap.Int64("compact-revision", rev), @@ -220,19 +220,19 @@ func (clus *Cluster) compact(rev int64, timeout time.Duration) (err error) { if err = clus.compactKV(rev, timeout); err != nil { return err } - clus.logger.Info( + clus.lg.Info( "compacted storage", zap.Int64("current-revision", clus.currentRevision), zap.Int64("compact-revision", rev), ) - clus.logger.Info( + clus.lg.Info( "checking compaction", zap.Int64("current-revision", clus.currentRevision), zap.Int64("compact-revision", rev), ) if err = clus.checkCompact(rev); err != nil { - clus.logger.Warn( + clus.lg.Warn( "checkCompact failed", zap.Int64("current-revision", clus.currentRevision), zap.Int64("compact-revision", rev), @@ -240,7 +240,7 @@ func (clus *Cluster) compact(rev int64, timeout time.Duration) (err error) { ) return err } - clus.logger.Info( + clus.lg.Info( "confirmed compaction", zap.Int64("current-revision", clus.currentRevision), zap.Int64("compact-revision", rev), @@ -254,7 +254,7 @@ func (clus *Cluster) failed() { return } - clus.logger.Info( + clus.lg.Info( "exiting on failure", zap.Int("round", clus.rd), zap.Int("case", clus.cs), @@ -275,7 +275,7 @@ func (clus *Cluster) cleanup() error { clus.closeStresser() if err := clus.FailArchive(); err != nil { - clus.logger.Warn( + clus.lg.Warn( "cleanup failed", zap.Int("round", clus.rd), zap.Int("case", clus.cs), @@ -284,7 +284,7 @@ func (clus *Cluster) cleanup() error { return err } if err := clus.Restart(); err != nil { - clus.logger.Warn( + clus.lg.Warn( "restart failed", zap.Int("round", clus.rd), zap.Int("case", clus.cs), diff --git a/tools/functional-tester/tester/failure_case_delay.go b/tools/functional-tester/tester/failure_case_delay.go index ba6d5a291..882a61888 100644 --- a/tools/functional-tester/tester/failure_case_delay.go +++ b/tools/functional-tester/tester/failure_case_delay.go @@ -30,7 +30,7 @@ func (f *failureDelay) Inject(clus *Cluster) error { return err } if f.delayDuration > 0 { - clus.logger.Info( + clus.lg.Info( "sleeping in failureDelay", zap.Duration("delay", f.delayDuration), zap.String("case", f.Failure.Desc()), diff --git a/tools/functional-tester/tester/stress.go b/tools/functional-tester/tester/stress.go index d23a88e9b..421a6d93f 100644 --- a/tools/functional-tester/tester/stress.go +++ b/tools/functional-tester/tester/stress.go @@ -21,6 +21,7 @@ import ( "go.uber.org/zap" ) +// Stresser defines stressing client operations. type Stresser interface { // Stress starts to stress the etcd cluster Stress() error @@ -38,7 +39,7 @@ type Stresser interface { func newStresser(clus *Cluster, idx int) Stresser { stressers := make([]Stresser, len(clus.Tester.StressTypes)) for i, stype := range clus.Tester.StressTypes { - clus.logger.Info("creating stresser", zap.String("type", stype)) + clus.lg.Info("creating stresser", zap.String("type", stype)) switch stype { case "NO_STRESS": @@ -48,8 +49,8 @@ func newStresser(clus *Cluster, idx int) Stresser { // TODO: Too intensive stressing clients can panic etcd member with // 'out of memory' error. Put rate limits in server side. stressers[i] = &keyStresser{ - logger: clus.logger, - Endpoint: clus.Members[idx].EtcdClientEndpoint, + lg: clus.lg, + m: clus.Members[idx], keySize: int(clus.Tester.StressKeySize), keyLargeSize: int(clus.Tester.StressKeySizeLarge), keySuffixRange: int(clus.Tester.StressKeySuffixRange), @@ -61,8 +62,8 @@ func newStresser(clus *Cluster, idx int) Stresser { case "LEASE": stressers[i] = &leaseStresser{ - logger: clus.logger, - endpoint: clus.Members[idx].EtcdClientEndpoint, + lg: clus.lg, + m: clus.Members[idx], numLeases: 10, // TODO: configurable keysPerLease: 10, // TODO: configurable rateLimiter: clus.rateLimiter, diff --git a/tools/functional-tester/tester/stress_key.go b/tools/functional-tester/tester/stress_key.go index fc291beec..9f795b937 100644 --- a/tools/functional-tester/tester/stress_key.go +++ b/tools/functional-tester/tester/stress_key.go @@ -25,6 +25,7 @@ import ( "github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/tools/functional-tester/rpcpb" "go.uber.org/zap" "golang.org/x/time/rate" @@ -33,9 +34,9 @@ import ( ) type keyStresser struct { - logger *zap.Logger + lg *zap.Logger - Endpoint string // TODO: use Member + m *rpcpb.Member keySize int keyLargeSize int @@ -59,9 +60,9 @@ type keyStresser struct { func (s *keyStresser) Stress() error { // TODO: add backoff option - conn, err := grpc.Dial(s.Endpoint, grpc.WithInsecure()) + conn, err := s.m.DialEtcdGRPCServer() if err != nil { - return fmt.Errorf("%v (%s)", err, s.Endpoint) + return fmt.Errorf("%v (%q)", err, s.m.EtcdClientEndpoint) } ctx, cancel := context.WithCancel(context.Background()) @@ -96,9 +97,9 @@ func (s *keyStresser) Stress() error { go s.run(ctx) } - s.logger.Info( + s.lg.Info( "key stresser started in background", - zap.String("endpoint", s.Endpoint), + zap.String("endpoint", s.m.EtcdClientEndpoint), ) return nil } @@ -150,9 +151,9 @@ func (s *keyStresser) run(ctx context.Context) { // from stresser.Cancel method: return default: - s.logger.Warn( + s.lg.Warn( "key stresser exited with error", - zap.String("endpoint", s.Endpoint), + zap.String("endpoint", s.m.EtcdClientEndpoint), zap.Error(err), ) return @@ -169,9 +170,9 @@ func (s *keyStresser) Close() { s.conn.Close() s.wg.Wait() - s.logger.Info( + s.lg.Info( "key stresser is closed", - zap.String("endpoint", s.Endpoint), + zap.String("endpoint", s.m.EtcdClientEndpoint), ) } diff --git a/tools/functional-tester/tester/stress_lease.go b/tools/functional-tester/tester/stress_lease.go index bf65d294a..890992f75 100644 --- a/tools/functional-tester/tester/stress_lease.go +++ b/tools/functional-tester/tester/stress_lease.go @@ -24,6 +24,7 @@ import ( "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/tools/functional-tester/rpcpb" "go.uber.org/zap" "golang.org/x/time/rate" @@ -37,14 +38,14 @@ const ( ) type leaseStresser struct { - logger *zap.Logger + lg *zap.Logger - endpoint string - cancel func() - conn *grpc.ClientConn - kvc pb.KVClient - lc pb.LeaseClient - ctx context.Context + m *rpcpb.Member + cancel func() + conn *grpc.ClientConn + kvc pb.KVClient + lc pb.LeaseClient + ctx context.Context rateLimiter *rate.Limiter // atomicModifiedKey records the number of keys created and deleted during a test case @@ -122,18 +123,18 @@ func (ls *leaseStresser) setupOnce() error { } func (ls *leaseStresser) Stress() error { - ls.logger.Info( + ls.lg.Info( "lease stresser is started", - zap.String("endpoint", ls.endpoint), + zap.String("endpoint", ls.m.EtcdClientEndpoint), ) if err := ls.setupOnce(); err != nil { return err } - conn, err := grpc.Dial(ls.endpoint, grpc.WithInsecure(), grpc.WithBackoffMaxDelay(1*time.Second)) + conn, err := ls.m.DialEtcdGRPCServer(grpc.WithBackoffMaxDelay(1 * time.Second)) if err != nil { - return fmt.Errorf("%v (%s)", err, ls.endpoint) + return fmt.Errorf("%v (%s)", err, ls.m.EtcdClientEndpoint) } ls.conn = conn ls.kvc = pb.NewKVClient(conn) @@ -161,24 +162,24 @@ func (ls *leaseStresser) run() { return } - ls.logger.Debug( + ls.lg.Debug( "lease stresser is creating leases", - zap.String("endpoint", ls.endpoint), + zap.String("endpoint", ls.m.EtcdClientEndpoint), ) ls.createLeases() - ls.logger.Debug( + ls.lg.Debug( "lease stresser created leases", - zap.String("endpoint", ls.endpoint), + zap.String("endpoint", ls.m.EtcdClientEndpoint), ) - ls.logger.Debug( + ls.lg.Debug( "lease stresser is dropped leases", - zap.String("endpoint", ls.endpoint), + zap.String("endpoint", ls.m.EtcdClientEndpoint), ) ls.randomlyDropLeases() - ls.logger.Debug( + ls.lg.Debug( "lease stresser dropped leases", - zap.String("endpoint", ls.endpoint), + zap.String("endpoint", ls.m.EtcdClientEndpoint), ) } } @@ -206,9 +207,9 @@ func (ls *leaseStresser) createAliveLeases() { defer wg.Done() leaseID, err := ls.createLeaseWithKeys(TTL) if err != nil { - ls.logger.Debug( + ls.lg.Debug( "createLeaseWithKeys failed", - zap.String("endpoint", ls.endpoint), + zap.String("endpoint", ls.m.EtcdClientEndpoint), zap.Error(err), ) return @@ -244,17 +245,17 @@ func (ls *leaseStresser) createShortLivedLeases() { func (ls *leaseStresser) createLeaseWithKeys(ttl int64) (int64, error) { leaseID, err := ls.createLease(ttl) if err != nil { - ls.logger.Debug( + ls.lg.Debug( "createLease failed", - zap.String("endpoint", ls.endpoint), + zap.String("endpoint", ls.m.EtcdClientEndpoint), zap.Error(err), ) return -1, err } - ls.logger.Debug( + ls.lg.Debug( "createLease created lease", - zap.String("endpoint", ls.endpoint), + zap.String("endpoint", ls.m.EtcdClientEndpoint), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), ) if err := ls.attachKeysWithLease(leaseID); err != nil { @@ -273,9 +274,9 @@ func (ls *leaseStresser) randomlyDropLeases() { // if randomlyDropLease encountered an error such as context is cancelled, remove the lease from aliveLeases // because we can't tell whether the lease is dropped or not. if err != nil { - ls.logger.Debug( + ls.lg.Debug( "randomlyDropLease failed", - zap.String("endpoint", ls.endpoint), + zap.String("endpoint", ls.m.EtcdClientEndpoint), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.Error(err), ) @@ -285,9 +286,9 @@ func (ls *leaseStresser) randomlyDropLeases() { if !dropped { return } - ls.logger.Debug( + ls.lg.Debug( "randomlyDropLease dropped a lease", - zap.String("endpoint", ls.endpoint), + zap.String("endpoint", ls.m.EtcdClientEndpoint), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), ) ls.revokedLeases.add(leaseID, time.Now()) @@ -314,9 +315,9 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) { select { case <-time.After(500 * time.Millisecond): case <-ls.ctx.Done(): - ls.logger.Debug( + ls.lg.Debug( "keepLeaseAlive context canceled", - zap.String("endpoint", ls.endpoint), + zap.String("endpoint", ls.m.EtcdClientEndpoint), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.Error(ls.ctx.Err()), ) @@ -328,9 +329,9 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) { renewTime, ok := ls.aliveLeases.read(leaseID) if ok && renewTime.Add(TTL/2*time.Second).Before(time.Now()) { ls.aliveLeases.remove(leaseID) - ls.logger.Debug( + ls.lg.Debug( "keepLeaseAlive lease has not been renewed, dropped it", - zap.String("endpoint", ls.endpoint), + zap.String("endpoint", ls.m.EtcdClientEndpoint), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), ) } @@ -338,9 +339,9 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) { } if err != nil { - ls.logger.Debug( + ls.lg.Debug( "keepLeaseAlive lease creates stream error", - zap.String("endpoint", ls.endpoint), + zap.String("endpoint", ls.m.EtcdClientEndpoint), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.Error(err), ) @@ -351,32 +352,32 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) { continue } - ls.logger.Debug( + ls.lg.Debug( "keepLeaseAlive stream sends lease keepalive request", - zap.String("endpoint", ls.endpoint), + zap.String("endpoint", ls.m.EtcdClientEndpoint), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), ) err = stream.Send(&pb.LeaseKeepAliveRequest{ID: leaseID}) if err != nil { - ls.logger.Debug( + ls.lg.Debug( "keepLeaseAlive stream failed to send lease keepalive request", - zap.String("endpoint", ls.endpoint), + zap.String("endpoint", ls.m.EtcdClientEndpoint), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.Error(err), ) continue } leaseRenewTime := time.Now() - ls.logger.Debug( + ls.lg.Debug( "keepLeaseAlive stream sent lease keepalive request", - zap.String("endpoint", ls.endpoint), + zap.String("endpoint", ls.m.EtcdClientEndpoint), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), ) respRC, err := stream.Recv() if err != nil { - ls.logger.Debug( + ls.lg.Debug( "keepLeaseAlive stream failed to receive lease keepalive response", - zap.String("endpoint", ls.endpoint), + zap.String("endpoint", ls.m.EtcdClientEndpoint), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.Error(err), ) @@ -385,9 +386,9 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) { // lease expires after TTL become 0 // don't send keepalive if the lease has expired if respRC.TTL <= 0 { - ls.logger.Debug( + ls.lg.Debug( "keepLeaseAlive stream received lease keepalive response TTL <= 0", - zap.String("endpoint", ls.endpoint), + zap.String("endpoint", ls.m.EtcdClientEndpoint), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.Int64("ttl", respRC.TTL), ) @@ -395,9 +396,9 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) { return } // renew lease timestamp only if lease is present - ls.logger.Debug( + ls.lg.Debug( "keepLeaseAlive renewed a lease", - zap.String("endpoint", ls.endpoint), + zap.String("endpoint", ls.m.EtcdClientEndpoint), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), ) ls.aliveLeases.update(leaseID, leaseRenewTime) @@ -444,31 +445,29 @@ func (ls *leaseStresser) randomlyDropLease(leaseID int64) (bool, error) { } } - ls.logger.Debug( + ls.lg.Debug( "randomlyDropLease error", - zap.String("endpoint", ls.endpoint), + zap.String("endpoint", ls.m.EtcdClientEndpoint), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.Error(ls.ctx.Err()), ) return false, ls.ctx.Err() } -func (ls *leaseStresser) Pause() { - ls.Close() -} +func (ls *leaseStresser) Pause() { ls.Close() } func (ls *leaseStresser) Close() { - ls.logger.Info( + ls.lg.Info( "lease stresser is closing", - zap.String("endpoint", ls.endpoint), + zap.String("endpoint", ls.m.EtcdClientEndpoint), ) ls.cancel() ls.runWg.Wait() ls.aliveWg.Wait() ls.conn.Close() - ls.logger.Info( + ls.lg.Info( "lease stresser is closed", - zap.String("endpoint", ls.endpoint), + zap.String("endpoint", ls.m.EtcdClientEndpoint), ) } @@ -477,9 +476,5 @@ func (ls *leaseStresser) ModifiedKeys() int64 { } func (ls *leaseStresser) Checker() Checker { - return &leaseChecker{ - logger: ls.logger, - endpoint: ls.endpoint, - ls: ls, - } + return &leaseChecker{lg: ls.lg, m: ls.m, ls: ls} }